3. Server Programming¶
This part describes the extensibility of the SensorBee server. Topics covered in this part are advanced and should be read after understanding the basics of SensorBee and BQL.
Because SensorBee is mainly written in
The Go Programming Language, understanding the language
before reading this part is also recommended.
A Tour of Go is an official tutorial and is one
of the best tutorials of the language. It runs on web browsers and does not
require any additional software installation. After learning the language,
How to Write Go Code helps to understand
how to use the go
tool and the standard way to develop Go packages and
applications.
This part assumes that the go
tool is installed and the development environment
including Go’s environment variables like GOROOT
or GOPATH
is
appropriately set up. SensorBee requires Go 1.4 or later.
3.1. Extending the SensorBee Server and BQL¶
Many features of the server and BQL can be extended by plugins. This chapter describes what types of features can be extended by users. Following chapters in this part describes how to develop those features in specific programming languages.
3.1.1. User-Defined Functions¶
A user-defined function (UDF) is a function that is implemented by a user and registered in the SensorBee server. Once it is registered, it be called from BQL statements:
SELECT RSTREAM my_udf(field) FROM src [RANGE 1 TUPLES];
A UDF behaves just like a built-in function. A UDF can take an arbitrary number
of arguments. Each argument can be any of built-in types
and can receive multiple types of values. A UDF can also support a variable-length
argument. A UDF has a single return value of any built-in type. When multiple return
values are required, a UDF can return the value as an array
.
Note
BQL currently does not support CREATE FUNCTION
statements like well-known
RDBMSs. UDFs can only be added through Go programs.
3.1.2. User-Defined Aggregate Functions¶
A user-defined aggregate function (UDAF) is a function similar to a UDF but can take aggregation parameters (see Aggregate Expressions) as arguments in addition to regular arguments.
3.1.3. User-Defined Stream-Generating Functions¶
Stream-generating functions can also be user-defined. There are two types of UDSFs. The first type behaves like a source, which is not connected to any input stream and generates tuples proactively:
... FROM my_counter() [RANGE ...
my_counter
above may emit tuples like {"count": 1}
.
This type of UDSFs are called source-like UDSFs.
The other type is called a stream-like UDSF and behaves just like a stream, which receives tuples from one or more incoming streams or sources. It receives names of streams or sources as its arguments:
... FROM my_udsf("another_stream", "yet_another_stream", other_params) [RANGE ...
Note that there is no rule on how to define UDFS’s arguments. Thus, the order and
the use of arguments depend on each UDFS. For example, a UDFS might take an
array
of string
containing names of input streams as its first argument:
my_union(["stream1", "stream2", "stream3"])
. Names of input stream do not
even need to be located at the beginning of the argument list:
my_udfs2(1, "another_stream")
is also possible.
Using UDSFs is a very powerful way to extend BQL since they can potentially do
anything that the SELECT
cannot do.
3.1.4. User-Defined States¶
A user-defined state (UDS) can be provided to support stateful data processing
(see Stateful Data Processing). A UDS is usually provided with a set of UDFs that
manipulate the state. Those UDFs take the name of the UDS as a string
argument:
CREATE STATE event_id_seq TYPE snowflake_id WITH machine_id = 1;
CREATE STREAM events_with_id AS
SELECT snowflake_id("event_id_seq"), * FROM events [RANGE 1 TUPLES];
In the example above, a UDS event_id_seq
is created with the type
snowflake_id
. Then, the UDS is passed to the UDF snowflake_id
, which
happens to have the same name as the type name of the UDS. The UDF looks up
the UDS event_id_seq
and returns a value computed based on the state.
3.1.5. Source Plugins¶
A source type developed by a user can be added to the SensorBee server as a plugin
so that it can be used in CREATE SOURCE
statements. A source type can have any
number of required and optional parameters. Each parameter can have any of
built-in types.
3.1.6. Sink Plugins¶
A sink type developed by a user can be added to the SensorBee server as a plugin
so that it can be used in CREATE SINK
statement. A sink type can have any
number of required and optional parameters. Each parameter can have any of
built-in types.
3.2. Extensions in Go¶
This chapter describes how to extend the SensorBee server in the Go programming language.
3.2.1. Development Flow of Components in Go¶
The typical development flow of components like a UDF, a UDS type, a source type, or a sink type should be discussed before looking into details of each component.
The basic flow is as follows:
- Create a git repository for components
- Implement components
- Create a plugin subpackage in the repository
3.2.1.1. Create a Git Repository for Components¶
Components are written in Go, so they need to be in a valid git repository (or a repository of a different version control system). One repository may provide multiple types of components. For example, a repository could have 10 UDFs, and 5 UDS types, 2 source types, and 1 sink type. However, since Go is very well designed to provide packages in a fine-grained manner, each repository should only provide a minimum set of components that are logically related and make sense to be in the same repository.
3.2.1.2. Implement Components¶
The next step is to implement components. There is no restriction on which standard or 3rd party packages to depend on.
Functions or structs that are to be registered to the SensorBee server need to be referred to by the plugin subpackage, which is described in the next subsection. Thus, names of those symbols need to start with a capital letter.
In this step, components should not be registered to the SensorBee server yet.
3.2.1.3. Create a Plugin Subpackage in the Repository¶
It is highly recommended that the repository has a separate package (i.e. a
subdirectory) which only registers components to the SensorBee server. There is
usually one file named “plugin.go” in the plugin package and it only contains a
series of registration function calls in init
function. For instance, if the
repository only provides one UDF, the contents of plugin.go
would look
like:
// in github.com/user/myudf/plugin/plugin.go
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/user/myudf"
)
func init() {
udf.MustRegisterGlobalUDF("my_udf", &myudf.MyUDF{})
}
There are two reasons to have a plugin subpackage separated from the implementation of components. First, by separating them, other Go packages can import the components to use the package as a library without registering them to SensorBee. Second, having a separated plugin package allows a user to register a component with a different name. This is especially useful when names of components conflict each other.
To use the example plugin above, the github.com/user/myudf/plugin
package needs
to be added to the plugin path list of SensorBee.
3.2.1.4. Repository Organization¶
The typical organization of the repository is
github.com/user/repo
README
: description and the usage of components in the repository.go
files: implementation of componentsplugin/
: a subpackage for the plugin registrationplugin.go
othersubpackages/
: there can be optional subpackages
3.2.2. User-Defined Functions¶
This section describes how to write a UDF in Go. It first shows the basic interface of defining UDFs, and then describes utilities around it, how to develop a UDF in a Go-ish manner, and a complete example.
3.2.2.1. Implementing a UDF¶
Note
This is a very low-level way to implement a UDF in Go. To learn about an easier way, see Generic UDFs.
Any struct implementing the following interface can be used as a UDF:
type UDF interface {
// Call calls the UDF.
Call(*core.Context, ...data.Value) (data.Value, error)
// Accept checks if the function accepts the given number of arguments
// excluding core.Context.
Accept(arity int) bool
// IsAggregationParameter returns true if the k-th parameter expects
// aggregated values. A UDF with Accept(n) == true is an aggregate
// function if and only if this function returns true for one or more
// values of k in the range 0, ..., n-1.
IsAggregationParameter(k int) bool
}
This interface is defined in the gopkg.in/sensorbee/sensorbee.v0/bql/udf
package.
A UDF can be registered via the RegisterGlobalUDF
or MustRegisterGlobalUDF
functions from the same package. MustRegisterGlobalUDF
is the same as
RegisterGlobalUDF
but panics on failure instead of returning an error. These
functions are usually called from the init
function in the UDF package’s
plugin
subpackage. A typical implementation of a UDF looks as follows:
type MyUDF struct {
...
}
func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
...
}
func (m *MyUDF) Accept(arity int) bool {
...
}
func (m *MyUDF) IsAggregationParameter(k int) bool {
...
}
func init() {
// MyUDF can be used as my_udf in BQL statements.
udf.MustRegisterGlobalUDF("my_udf", &MyUDF{})
}
As it can be inferred from this example, a UDF itself should be stateless since it only registers one instance of a struct as a UDF and it will be shared globally. Stateful data processing can be achieved by the combination of UDFs and UDSs, which is described in User-Defined States.
A UDF needs to implement three methods to satisfy udf.UDF
interface:
Call
, Accept
, and IsAggregationParameter
.
The Call
method receives a *core.Context
and multiple data.Value
as its
arguments. *core.Context
contains the information of the current processing
context. Call
‘s ...data.Value
argument holds the values passed to the UDF.
data.Value
represents a value used in BQL and can be any of built-in
types.
SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];
In this example, arg1
and arg2
are passed to the Call
method:
func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
// When my_udf(arg1, arg2) is called, len(args) is 2.
// args[0] is arg1 and args[1] is arg2.
// It is guaranteed that m.Accept(len(args)) is always true.
}
Because data.Value
is a semi-variant type, the Call
method needs to check
the type of each data.Value
and convert it to a desired type.
The Accept
method verifies if the UDF accepts the specific number of arguments.
It can return true for multiple arities as long as it can receive the given
number of arguments. If a UDF only accepts two arguments, the method is implemented
as follows:
func (m *MyUDF) Accept(arity int) bool {
return arity == 2
}
When a UDF aims to support variadic parameters (a.k.a. variable-length
arguments) with two required arguments (e.g.
my_udf(arg1, arg2, optional1, optional2, ...)
), the implementation would be:
func (m *MyUDF) Accept(arity int) bool {
return arity >= 2
}
Finally, IsAggregationParameter
returns whether the k-th argument (starting
from 0) is an aggregation parameter. Aggregation parameters are passed as a
data.Array
containing all values of a field in each group.
All of these methods can be called concurrently from multiple goroutines and they must be thread-safe.
The registered UDF is looked up based on its name and the number of argument passed to it.
SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];
In this SELECT
, a UDF having the name my_udf
is looked up first. After
that, its Accept
method is called with 2 and my_udf
is actually selected
if Accept(2)
returned true. IsAggregationParameter
method is
additionally called on each argument to see if the argument needs to be an
aggregation parameter. Then, if there is no mismatch, my_udf
is finally
called.
Note
A UDF does not have a schema at the moment, so any error regarding types of arguments will not be reported until the statement calling the UDF actually processes a tuple.
3.2.2.2. Generic UDFs¶
SensorBee provides a helper function to register a regular Go function as a UDF
without implementing the UDF
interface explicitly.
func Inc(v int) int {
return v + 1
}
This function Inc
can be transformed into a UDF by ConvertGeneric
or MustConvertGeneric
function defined in the
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package. By combining it with
RegisterGlobalUDF
, the Inc
function can easily be registered as a UDF:
func init() {
udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}
So, a complete example of the UDF implementation and registration is as follows:
package inc
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
)
func Inc(v int) int {
return v + 1
}
func init() {
udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}
Note
A UDF implementation and registration should actually be separated to different packages. See Development Flow of Components in Go for details.
Although this approach is handy, there is some small overhead compared to a UDF implemented in the regular way. Most of such overhead comes from type checking and conversions.
Functions passed to ConvertGeneric
need to satisfy some restrictions on
the form of their argument and return value types. Each restriction is described
in the following subsections.
Form of Arguments¶
In terms of valid argument forms, there are some rules to follow:
- A Go function can receive
*core.Context
as the first argument, or can omit it. - A function can have any number of arguments including 0 arguments as long as Go accepts them.
- A function can be variadic with or without non-variadic parameters.
There are basically eight (four times two, whether a function has
*core.Context
or not) forms of arguments (return values are
intentionally omitted for clarity):
Functions receiving no argument in BQL (e.g.
my_udf()
)func(*core.Context)
: A function only receiving*core.Context
func()
: A function having no argument and not receiving*core.Context
, either
Functions having non-variadic arguments but no variadic arguments
func(*core.Context, T1, T2, ..., Tn)
func(T1, T2, ..., Tn)
Functions having variadic arguments but no non-variadic arguments
func(*core.Context, ...T)
func(...T)
Functions having both variadic and non-variadic arguments
func(*core.Context, T1, T2, ..., Tn, ...Tn+1)
func(T1, T2, ..., Tn, ...Tn+1)
Here are some examples of invalid function signatures:
func(T, *core.Context)
:*core.Context
must be the first argument.func(NonSupportedType)
: Only supported types, which will be explained later, can be used.
Although return values are omitted from all the examples above, they are actually required. The next subsection explains how to define valid return values.
Form of Return Values¶
All functions need to have return values. There are two forms of return values:
func(...) R
func(...) (R, error)
All other forms are invalid:
func(...)
func(...) error
func(...) NonSupportedType
Valid types of return values are same as the valid types of arguments, and they are listed in the following subsection.
Valid Value Types¶
The list of Go types that can be used for arguments and the return value is as follows:
bool
- signed integers:
int
,int8
,int16
,int32
,int64
- unsigned integers:
uint
,uint8
,uint16
,uint32
,uint64
float32
,float64
string
time.Time
- data:
data.Bool
,data.Int
,data.Float
,data.String
,data.Blob
,data.Timestamp
,data.Array
,data.Map
,data.Value
- A slice of any type above, including
data.Value
data.Value
can be used as a semi-variant type, which will receive all types
above.
When the argument type and the actual value type are different, weak type
conversion are applied to values. Conversions are basically done by
data.ToXXX
functions (see godoc comments of each function in
data/type_conversions.go
). For example, func inc(i int) int
can be called by
inc("3")
in a BQL statement and it will return 4. If a strict type checking
or custom type conversion is required, receive values as data.Value
and
manually check or convert types, or define the UDF in the regular way.
Examples of Valid Go Functions¶
The following functions can be converted to UDFs by ConvertGeneric
or
MustConvertGeneric
function:
func rand() int
func pow(*core.Context, float32, float32) (float32, error)
func join(*core.Context, ...string) string
func format(string, ...data.Value) (string, error)
func keys(data.Map) []string
3.2.2.3. Complete Examples¶
This subsection shows three example UDFs:
my_inc
my_join
my_join2
Assume that these are in the repository github.com/sensorbee/examples/udfs
(which actually does not exist). The repository has three files:
inc.go
join.go
plugin/plugin.go
inc.go¶
In inc.go
, the Inc
function is defined as a pure Go function with a standard
value type:
package udfs
func Inc(v int) int {
return v + 1
}
join.go¶
In join.go
, the Join
UDF is defined in a strict way. It also performs
strict type checking. It is designed to be called in one of two forms:
my_join("a", "b", "c", "separator")
or
my_join(["a", "b", "c"], "separator")
. Each argument and value in the array
must be a string. The UDF receives an arbitrary number of arguments.
package udfs
import (
"errors"
"strings"
"pfi/sensorbee/sensorbee/core"
"pfi/sensorbee/sensorbee/data"
)
type Join struct {
}
func (j *Join) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
empty := data.String("")
if len(args) == 1 {
return empty, nil
}
switch args[0].Type() {
case data.TypeString: // my_join("a", "b", "c", "sep") form
var ss []string
for _, v := range args {
s, err := data.AsString(v)
if err != nil {
return empty, err
}
ss = append(ss, s)
}
return data.String(strings.Join(ss[:len(ss)-1], ss[len(ss)-1])), nil
case data.TypeArray: // my_join(["a", "b", "c"], "sep") form
if len(args) != 2 {
return empty, errors.New("wrong number of arguments for my_join(array, sep)")
}
sep, err := data.AsString(args[1])
if err != nil {
return empty, err
}
a, _ := data.AsArray(args[0])
var ss []string
for _, v := range a {
s, err := data.AsString(v)
if err != nil {
return empty, err
}
ss = append(ss, s)
}
return data.String(strings.Join(ss, sep)), nil
default:
return empty, errors.New("the first argument must be a string or an array")
}
}
func (j *Join) Accept(arity int) bool {
return arity >= 1
}
func (j *Join) IsAggregationParameter(k int) bool {
return false
}
plugin/plugin.go¶
In addition to Inc
and Join
, this file registers the standard Go
function strings.Join
as my_join2
. Because it’s converted to a UDF by
udf.MustConvertGeneric
, arguments are weakly converted to given types.
For example, my_join2([1, 2.3, "4"], "-")
is valid although strings.Join
itself is func([]string, string) string
.
package plugin
import (
"strings"
"pfi/sensorbee/sensorbee/bql/udf"
"pfi/nobu/docexamples/udfs"
)
func init() {
udf.MustRegisterGlobalUDF("my_inc", udf.MustConvertGeneric(udfs.Inc))
udf.MustRegisterGlobalUDF("my_join", &udfs.Join{})
udf.MustRegisterGlobalUDF("my_join2", udf.MustConvertGeneric(strings.Join))
}
Evaluating Examples¶
Once the sensorbee
command is built with those UDFs and a topology is
created on the server, the EVAL
statement can be used to test them:
EVAL my_inc(1); -- => 2
EVAL my_inc(1.5); -- => 2
EVAL my_inc("10"); -- => 11
EVAL my_join("a", "b", "c", "-"); -- => "a-b-c"
EVAL my_join(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join(1, "b", "c", "-") -- => error
EVAL my_join([1, "b", "c"], ",") -- => error
EVAL my_join2(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join2([1, "b", "c"], ",") -- => "1,b,c"
3.2.2.4. Dynamic Loading¶
Dynamic loading of UDFs written in Go is not supported at the moment because Go does not support loading packages dynamically.
3.2.3. User-Defined Stream-Generating Functions¶
This section describes how to write a UDSF in Go.
3.2.3.1. Implementing a UDSF¶
To provide a UDSF, two interfaces need to be implemented: UDSF
and
UDSFCreator
.
The interface UDSF
is defined as follows in the
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package.
type UDSF interface {
Process(ctx *core.Context, t *core.Tuple, w core.Writer) error
Terminate(ctx *core.Context) error
}
The Process
method processes an input tuple and emits computed tuples for
subsequent streams. ctx
contains the processing context information. t
is the tuple to be processed in the UDSF. w
is the destination to where
computed tuples are emitted. The Terminate
method is called when the UDFS
becomes unnecessary. The method has to release all the resources the UDSF has.
How the Process
method is called depends on the type of a UDSF. When a UDFS
is a stream-like UDSF (i.e. it has input from other streams), the Process
method is called every time a new tuple arrives. The argument t
contains the
tuple emitted from another stream. Stream-like UDSFs have to return from
Process
immediately after processing the input tuple. They must not
block in the method.
A stream-like UDSF is used mostly when multiple tuples need to be computed and emitted based on one input tuple:
type WordSplitter struct {
field string
}
func (w *WordSplitter) Process(ctx *core.Context, t *core.Tuple, writer core.Writer) error {
var kwd []string
if v, ok := t.Data[w.field]; !ok {
return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
} else if s, err := data.AsString(v); err != nil {
return fmt.Errorf("'%v' field must be string: %v", w.field, err)
} else {
kwd = strings.Split(s, " ")
}
for _, k := range kwd {
out := t.Copy()
out.Data[w.field] = data.String(k)
if err := writer.Write(ctx, out); err != nil {
return err
}
}
return nil
}
func (w *WordSplitter) Terminate(ctx *core.Context) error {
return nil
}
WordSplitter
splits text in a specific field by space. For example, when
an input tuple is {"word": "a b c"}
and WordSplitter.Field
is word
,
following three tuples will be emitted: {"word": "a"}
, {"word": "b"}
,
and {"word": "c"}
.
When a UDSF is a source-like UDSF, the Process
method is only called once
with a tuple that does not mean anything. Unlike a stream-like UDSF, the
Process
method of a source-like UDSF does not have to return until it has
emitted all tuples, the Terminate
method is called, or a fatal error occurs.
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
var i int64
for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
if err := w.Write(ctx, newTuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Terminate(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
In this example, Ticker
emits tuples having tick
field containing
a counter until the Terminate
method is called.
Whether a UDSF is stream-like or source-like can be configured when it is
created by UDSFCreator
. The interface UDSFCreator
is defined as follows
in gopkg.in/sensorbee/sensorbee.v0/bql/udf
package:
type UDSFCreator interface {
CreateUDSF(ctx *core.Context, decl UDSFDeclarer, args ...data.Value) (UDSF, error)
Accept(arity int) bool
}
The CreateUDSF
method creates a new instance of a UDSF. The method is called
when evaluating a UDSF in the FROM
clause of a SELECT
statement. ctx
contains the processing context information. decl
is used to customize the
behavior of the UDSF, which is explained later. args
has arguments passed in the
SELECT
statement. The Accept
method verifies if the UDSF accept the
specific number of arguments. This is the same as UDF.Arity
method (see
User-Defined Functions).
UDSFDeclarer
is used in the CreateUDSF
method to customize the
behavior of a UDSF:
type UDSFDeclarer interface {
Input(name string, config *UDSFInputConfig) error
ListInputs() map[string]*UDSFInputConfig
}
By calling its Input
method, a UDSF will be able to receive tuples from
another stream with the name name
. Because the name
is given outside the
UDSF, it’s uncontrollable from the UDSF. However, there are cases that a UDSF
wants to know from which stream a tuple has come. For example, when providing
a UDSF performing a JOIN or two streams, a UDSF needs to distinguish which
stream emitted the tuple. If the UDSF was defined as
my_join(left_stream, right_stream)
, decl
can be used as follows in
UDSFCreator.CreateUDSF
:
decl.Input(args[0], &UDSFInputConfig{InputName: "left"})
decl.Input(args[1], &UDSFInputConfig{InputName: "right"})
By configuring the input stream in this way, a tuple passed to UDSF.Process
has
the given name in its Tuple.InputName
field:
func (m *MyJoin) Process(ctx *core.Context, t *core.Tuple, w core.Writer) error {
switch t.InputName {
case "left":
... process tuples from left_stream ...
case "right":
... process tuples from right_stream ...
}
...
}
If a UDSF is configured to have one or more input streams by decl.Input
in
the UDSFCreator.CreateUDSF
method, the UDSF is processed as a stream-like
UDSF. Otherwise, if a UDSF doesn’t have any input (i.e. decl.Input
is not
called), the UDSF becomes a source-like UDSF.
As an example, the UDSFCreator
of WordSplitter
is shown below:
type WordSplitterCreator struct {
}
func (w *WordSplitterCreator) CreateUDSF(ctx *core.Context,
decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
input, err := data.AsString(args[0])
if err != nil {
return nil, fmt.Errorf("input stream name must be a string: %v", args[0])
}
field, err := data.AsString(args[1])
if err != nil {
return nil, fmt.Errorf("target field name must be a string: %v", args[1])
}
// This Input call makes the UDSF a stream-like UDSF.
if err := decl.Input(input, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
func (w *WordSplitterCreator) Accept(arity int) bool {
return arity == 2
}
Although the UDSF has not been registered to the SensorBee server yet, it could
appear like word_splitter(input_stream_name, target_field_name)
if it was
registered with the name word_splitter
.
For another example, the UDSFCreator
of Ticker
is shown below:
type TickerCreator struct {
}
func (t *TickerCreator) CreateUDSF(ctx *core.Context,
decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(args[0])
if err != nil {
return nil, err
}
// Since this is a source-like UDSF, there's no input.
return &Ticker{
interval: interval,
}, nil
}
func (t *TickerCreator) Accept(arity int) bool {
return arity == 1
}
Like word_splitter
, its signature could be ticker(interval)
if the UDSF
is registered as ticker
.
The implementation of this UDSF is completed and the next step is to register it to the SensorBee server.
3.2.3.2. Registering a UDSF¶
A UDSF can be used in BQL by registering its UDSFCreator
interface to
the SensorBee server using the RegisterGlobalUDSFCreator
or
MustRegisterGlobalUDSFCreator
functions, which are defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf
.
The following example registers WordSplitter
and Ticker
:
func init() {
udf.RegisterGlobalUDSFCreator("word_splitter", &WordSplitterCreator{})
udf.RegisterGlobalUDSFCreator("ticker", &TickerCreator{})
}
3.2.3.3. Generic UDSFs¶
Just like UDFs have a ConvertGeneric
function, UDSFs also have
ConvertToUDSFCreator
and MustConvertToUDSFCreator
function. They convert
a regular function satisfying some restrictions to the UDSFCreator
interface.
The restrictions are the same as for
generic UDFs except that a
function converted to the UDSFCreator
interface has an additional argument
UDSFDeclarer
. UDSFDeclarer
is located after *core.Context
and before
other arguments. Examples of valid function signatures are show below:
func(*core.Context, UDSFDeclarer, int)
func(UDSFDeclarer, string)
func(UDSFDeclarer)
func(*core.Context, UDSFDeclarer, ...data.Value)
func(UDSFDeclarer, ...float64)
func(*core.Context, UDSFDeclarer, int, ...string)
func(UDSFDeclarer, int, float64, ...time.Time)
Unlike *core.Context
, UDSFDeclarer
cannot be omitted. The same set of
types can be used for arguments as types that ConvertGeneric
function
accepts.
WordSplitterCreator
can be rewritten with the ConvertToUDSFCreator
function as follows:
func CreateWordSplitter(decl udf.UDSFDeclarer,
inputStream, field string) (udf.UDSF, error) {
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
func init() {
udf.RegisterGlobalUDSFCreator("word_splitter",
udf.MustConvertToUDSFCreator(WordSplitterCreator))
}
TickerCreator
can be replaced with ConvertToUDSFCreator
, too:
func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(i)
if err != nil {
return nil, err
}
return &Ticker{
interval: interval,
}, nil
}
func init() {
udf.MustRegisterGlobalUDSFCreator("ticker",
udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
3.2.3.4. A Complete Example¶
This subsection provides a complete example of UDSFs described in this section.
In addition to word_splitter
and ticker
, the example also includes the
lorem
source, which periodically emits random texts as
{"text": "lorem ipsum dolor sit amet"}
.
Assume that the import path of the example repository is
github.com/sensorbee/examples/udsfs
, which doesn’t actually exist. The
repository has four files:
lorem.go
splitter.go
ticker.go
plugin/plugin.go
lorem.go¶
To learn how to implement a source plugin, see Source Plugins.
package udsfs
import (
"math/rand"
"strings"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
var (
Lorem = strings.Split(strings.Replace(`lorem ipsum dolor sit amet
consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore
magna aliqua Ut enim ad minim veniam quis nostrud exercitation ullamco laboris
nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in reprehenderit
in voluptate velit esse cillum dolore eu fugiat nulla pariatur Excepteur sint
occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim
id est laborum`, "\n", " ", -1), " ")
)
type LoremSource struct {
interval time.Duration
}
func (l *LoremSource) GenerateStream(ctx *core.Context, w core.Writer) error {
for {
var text []string
for l := rand.Intn(5) + 5; l > 0; l-- {
text = append(text, Lorem[rand.Intn(len(Lorem))])
}
t := core.NewTuple(data.Map{
"text": data.String(strings.Join(text, " ")),
})
if err := w.Write(ctx, t); err != nil {
return err
}
time.Sleep(l.interval)
}
}
func (l *LoremSource) Stop(ctx *core.Context) error {
return nil
}
func CreateLoremSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&LoremSource{
interval: interval,
}), nil
}
splitter.go¶
package udsfs
import (
"fmt"
"strings"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type WordSplitter struct {
field string
}
func (w *WordSplitter) Process(ctx *core.Context,
t *core.Tuple, writer core.Writer) error {
var kwd []string
if v, ok := t.Data[w.field]; !ok {
return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
} else if s, err := data.AsString(v); err != nil {
return fmt.Errorf("'%v' field must be string: %v", w.field, err)
} else {
kwd = strings.Split(s, " ")
}
for _, k := range kwd {
out := t.Copy()
out.Data[w.field] = data.String(k)
if err := writer.Write(ctx, out); err != nil {
return err
}
}
return nil
}
func (w *WordSplitter) Terminate(ctx *core.Context) error {
return nil
}
func CreateWordSplitter(decl udf.UDSFDeclarer,
inputStream, field string) (udf.UDSF, error) {
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
ticker.go¶
package udsfs
import (
"sync/atomic"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
var i int64
for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
if err := w.Write(ctx, newTuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Terminate(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(i)
if err != nil {
return nil, err
}
return &Ticker{
interval: interval,
}, nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/sensorbee/examples/udsfs"
)
func init() {
bql.MustRegisterGlobalSourceCreator("lorem",
bql.SourceCreatorFunc(udsfs.CreateLoremSource))
udf.MustRegisterGlobalUDSFCreator("word_splitter",
udf.MustConvertToUDSFCreator(udsfs.CreateWordSplitter))
udf.MustRegisterGlobalUDSFCreator("ticker",
udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
Example BQL Statements¶
CREATE SOURCE lorem TYPE lorem;
CREATE STREAM lorem_words AS
SELECT RSTREAM * FROM word_splitter("lorem", "text") [RANGE 1 TUPLES];
Results of word_splitter
can be received by the following SELECT
:
SELECT RSTREAM * FROM lorem_words [RANGE 1 TUPLES];
3.2.4. User-Defined States¶
This section describes how to write a UDS in Go.
3.2.4.1. Implementing a UDS¶
A struct implementing the following interface can be used as a UDS:
type SharedState interface {
Terminate(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core
package.
Terminate
method is called when the UDS becomes no longer in use. It should
release any resource that the UDS has allocated so far.
As an example, a UDS having a monotonically increasing counter can be implemented as follows:
type Counter struct {
c int64
}
func (c *Counter) Terminate(ctx *core.Context) error {
return nil
}
func (c *Counter) Next() int64 {
return atomic.AddInt64(&c.c, 1)
}
At the moment, there’s no way to manipulate the UDS from BQL statements. UDSs are usually provided with a set of UDFs that read or update the UDS. It’ll be described later in Manipulating a UDS via a UDF. Before looking into the UDS manipulation, registering and creating a UDS needs to be explained.
3.2.4.2. Registering a UDS¶
To register a UDS to the SensorBee server , the UDS needs to provide its
UDSCreator
. UDSCreator
is an interface defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package as follows:
type UDSCreator interface {
CreateState(ctx *core.Context, params data.Map) (core.SharedState, error)
}
UDSCreator.CreateState
method is called when executing a CREATE STATE
statement. The method creates a new instance of the UDS and initializes it with
the given parameters. The argument ctx
has the processing context
information and params
has parameters specified in the WITH
clause of
the CREATE STATE
.
The creator can be registered by RegisterGlobalUDSCreator
or
MustRegisterGlobalUDSCreator
function defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package.
The following is the implementation and the registration of the creator for
Counter
UDS above:
type CounterCreator struct {
}
func (c *CounterCreator) CreateState(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter", &CounterCreator{})
}
The creator in this example is registered with the UDS type name my_counter
.
The creator supports start
parameter which is used as the first value that
Counter.Next
returns. The parameter can be specified in the CREATE STATE
as follows:
CREATE STATE my_counter_instance TYPE my_counter WITH start = 100;
Because the creator creates a new instance every time the CREATE STATE
is
executed, there can be multiple instances of a specific UDS type:
CREATE STATE my_counter_instance1 TYPE my_counter;
CREATE STATE my_counter_instance2 TYPE my_counter;
CREATE STATE my_counter_instance3 TYPE my_counter;
...
Once an instance of the UDS is created by the CREATE STATE
, UDFs can refer
them and manipulate their state.
udf.UDSCreatorFunc
¶
A function having the same signature as UDSCreator.CreateState
can be
converted into UDSCreator
by by udf.UDSCreatorFunc
utility function:
func UDSCreatorFunc(f func(*core.Context, data.Map) (core.SharedState, error)) UDSCreator
For example, CounterCreator
can be defined as a function and registered as
follows with this utility:
func CreateCounter(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter",
&udf.UDSCreatorFunc(CreateCounter))
}
To support SAVE STATE
and LOAD STATE
statements, however, this utility
function cannot be used because the creator needs to have the LoadState
method. How to support saving and loading is described later.
3.2.4.3. Manipulating a UDS via a UDF¶
To manipulate a UDS from BQL statements, a set of UDFs that read or update the UDS has to be provided with it:
func Next(ctx *core.Context, uds string) (int64, error) {
s, err := ctx.SharedStates.Get(uds)
if err != nil {
return 0, err
}
c, ok := s.(*Counter)
if !ok {
return 0, fmt.Errorf("the state isn't a counter: %v", uds)
}
return c.Next(), nil
}
func init() {
udf.MustRegisterGlobalUDF("my_next_count", udf.MustConvertGeneric(Next))
}
In this example, a UDF my_next_count
is registered to the SensorBee server.
The UDF calls Counter.Next
method to obtain the next count and returns it.
The UDF receives one argument uds
that is the name of the UDS to be updated.
CREATE STATE my_counter_instance TYPE my_counter;
CREATE STREAM events_with_id AS
SELECT RSTREAM my_next_count("my_counter_instance") AS id, *
FROM events [RANGE 1 TUPLES];
The BQL statements above add IDs to tuples emitted from a stream events
. The
state my_counter_instance
is created with the type my_counter
. Then,
my_next_count
UDF is called with the name. Every time the UDF is called,
the state of my_counter_instance
is updated by its Next
method.
my_next_count
(i.e. Next
function in Go) can look up the instance of
the UDS by its name through core.Context.SharedStates
. SharedStates
manages all the UDSs created in a topology. SharedState.Get
returns the
instance of the UDS having the given name. It returns an error if it couldn’t
find the instance. In the example above, my_next_count("my_counter_instance")
will look up an instance of the UDS having the name my_counter_instance
,
which was previously created by the CREATE STATE
. The UDS returned from
Get
method has the type core.SharedState
and cannot directly be used as
Counter
. Therefore, it has to be cast to *Counter
.
Since the state can be any type satisfying core.SharedState
, a UDS can
potentially have any information such as machine learning models,
dictionaries for natural language processing, or even an in-memory database.
Note
As UDFs are concurrently called from multiple goroutines, UDSs also needs to be thread-safe.
3.2.4.4. Saving and Loading a UDS¶
Counter
implemented so far doesn’t support saving and loading its state.
Thus, its count will be reset every time the server restarts. To save the
state and load it later on, the UDS and its creator need to provide some
methods. After providing those method, the state can be saved by the
SAVE STATE
statement and loaded by LOAD STATE
statement.
Supporting SAVE STATE
¶
By adding Save
method having the following signature to a UDS, the UDS can
be saved by the SAVE STATE
statement:
Save(ctx *core.Context, w io.Writer, params data.Map) error
Save
method writes all the data that the state has to w io.Writer
.
The data can be written in any format as long as corresponding loading methods
can reconstruct the state from it. It can be in JSON, msgpack, Protocol Buffer,
and so on.
Warning
Providing forward/backward compatibility or version controlling of the saved data is the responsibility of the author of the UDS.
*core.Context
has the processing context information. params
argument
is not used at the moment and reserved for the future use.
Once Save method is provided, the UDS can be saved by SAVE STATE
statement:
SAVE STATE my_counter_instance;
The SAVE STATE
doesn’t take any parameters now. The location and the
physical format of the saved UDS data depend on the configuration of the
SensorBee server or program running BQL statements. However, it is guaranteed
that the saved data can be loaded by the same program via the LOAD STATE
statement, which is described later.
Save
method of previously implemented Counter
can be as follows:
func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}
Note
Because this counter is very simple, there’s no version controlling logic in the method. As the minimum solution, having a version number at the beginning of the data is sufficient for most cases.
Supporting LOAD STATE
¶
To support the LOAD STATE
statement, a UDSCreator
needs to have
LoadState
method having the following signature:
LoadState(ctx *core.Context, r io.Reader, params data.Map) (core.SharedState, error)
Note
LoadState
method needs to be defined in a UDSCreator
, not in the
UDS itself.
LoadState
method reads data from r io.Reader
. The data has exactly the
same format as the one previously written by Save
method of a UDS.
params
has parameters specified in the SET
clause in the LOAD STATE
statement.
Note
Parameters specified in the SET
clause doesn’t have to be same as ones
given in the WITH
clause of the CREATE STATE
statement. See
LOAD STATE for details.
When LoadState
method returns an error, the LOAD STATE
statement with
CREATE IF NOT STATE
doesn’t fallback to CREATE STATE
, but it just fails.
Once LoadState
method is added to the UDSCreator
, the saved state can be
loaded by LOAD STATE
statement.
LoadState
method of previously implemented CounterCreator
can be as
follows:
func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
return nil, err
}
return cnt, nil
}
Providing Load
method in a UDS¶
In addition to implementing LoadState
method in a UDS’s creator, a UDS
itself can provide Load
method. While LoadState
method creates a new
state instance and replace it with the previous instance, Load
method
dynamically modifies the existing instance. Therefore, Load
method can
potentially be more efficient than LoadState
method although it has to
provide appropriate failure handling and concurrency control so that (1) the
UDS doesn’t become invalid on failure (i.e. Load
methods is “exception
safe”) or by concurrent calls, and (2) other operations on the UDS don’t block
for a long time.
The signature of Load
method is almost the same as LoadState
method
except that Load
method doesn’t return a new core.SharedState
but
updates the UDS itself instead:
Load(ctx *Context, r io.Reader, params data.Map) error
Load
method of previously implemented Counter
can be as follows:
func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
var cnt int64
if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
return err
}
atomic.StoreInt64(&c.c, cnt)
return nil
}
How Loading is Processed¶
SensorBee tries to use these two loading methods LoadState
and Load
in the following rule:
When a UDS’s creator doesn’t provide
LoadState
method, theLOAD STATE
statement fails.- The
LOAD STATE
statement fails even if the UDS implements itsLoad
method. To support the statement,LoadState
method is always required in its creator. This is becauseLoad
method only works when an instance of the UDS is already created or loaded, and it cannot be used for a nonexistent instance. - The
LOAD STATE CREATE IF NOT SAVED
statement also fails ifLoadState
method isn’t provided. The statement callsCreateState
method when the state hasn’t previously been saved. Otherwise, it’ll try to load the saved data. Therefore, if the data is previously saved and an instance of the UDS hasn’t been created yet, the statement cannot create a new instance withoutLoadState
method in the creator. To be consistent on various conditions, theLOAD STATE CREATE IF NOT SAVED
statement fails ifLoadState
method isn’t provided regardless of whether the state has been saved before or not.
- The
When a UDS’s creator provides
LoadState
method and the UDS doesn’t provideLoad
method, theLOAD STATE
statement tries to load a model throughLoadState
method.- It will create a new instance so that it consumes twice as much memory.
When a UDS’s creator provides
LoadState
method and the UDS also providesLoad
method,Load
method will be used when the instance has already been created or loaded.LoadState
method wouldn’t be used even ifLoad
method failed.
LoadState
method will be used otherwise.
Note
This is already mentioned in the list above, but LoadState
method always
needs to be provided even if a UDS implements Load
method.
3.2.4.5. A Complete Example¶
A complete example of the state is shown in this subsection. Assume that the
import path of the example repository is
github.com/sensorbee/examples/counter
, which doesn’t actually exist. The
repository has two files:
- counter.go
- plugin/plugin.go
counter.go¶
package counter
import (
"encoding/binary"
"fmt"
"io"
"sync/atomic"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Counter struct {
c int64
}
func (c *Counter) Terminate(ctx *core.Context) error {
return nil
}
func (c *Counter) Next() int64 {
return atomic.AddInt64(&c.c, 1)
}
func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}
func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
var cnt int64
if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
return err
}
atomic.StoreInt64(&c.c, cnt)
return nil
}
type CounterCreator struct {
}
func (c *CounterCreator) CreateState(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
return nil, err
}
return cnt, nil
}
func Next(ctx *core.Context, uds string) (int64, error) {
s, err := ctx.SharedStates.Get(uds)
if err != nil {
return 0, err
}
c, ok := s.(*Counter)
if !ok {
return 0, fmt.Errorf("the state isn't a counter: %v", uds)
}
return c.Next(), nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/sensorbee/examples/counter"
)
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter",
&counter.CounterCreator{})
udf.MustRegisterGlobalUDF("my_next_count",
udf.MustConvertGeneric(counter.Next))
}
3.2.4.6. Writing Tuples to a UDS¶
When a UDS implements core.Writer
, the INSERT INTO
statement can
insert tuples into the UDS via the uds
sink:
type Writer interface {
Write(*Context, *Tuple) error
}
The following is the example of using the uds
sink:
CREATE STATE my_state TYPE my_state_type;
CREATE SINK my_state_sink TYPE uds WITH name = "my_state";
INSERT INTO my_state_sink FROM some_stream;
If my_state_type
doesn’t implement core.Writer
, the CREATE SINK
statement fails. Every time some_stream
emits a tuple, the Write
method of my_state
is called.
Example¶
Models provided by Jubatus machine learning plugin for SensorBee implement
the Write
method. When tuples are inserted into a UDS, it trains the model
it has.
3.2.5. Source Plugins¶
This section describes how to implement a source as a plugin of SensorBee.
3.2.5.1. Implementing a Source¶
A struct implementing the following interface can be a source:
type Source interface {
GenerateStream(ctx *Context, w Writer) error
Stop(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core
package.
The GenerateStream
methods actually generate tuples for subsequent streams.
The argument ctx
contains the information of the current processing context.
w
is the destination to where generated tuples are emitted. The Stop
method stops GenerateStream
. It should wait until the GenerateStream
method call returns, but it isn’t mandatory.
Once the GenerateStream
method is called, a source can emit as many tuples
as many tuples as it requires. A source basically needs to return from its
GenerateStream
method when:
- it emitted all the tuples it has
- the
Stop
method was called - a fatal error occurred
The Stop
method can be called concurrently while the GenerateStream
method is working and it must be thread-safe. As long as a source is used by
components defined in SensorBee, it’s guaranteed that its Stop
method is
called only once and it doesn’t have to be idempotent. However, it is
recommended that a source provide a termination check in its Stop
method
to avoid a double free problem.
A typical implementation of a source is shown below:
func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
<initialization>
defer func() {
<clean up>
}()
for <check stop> {
t := <create a new tuple>
if err := w.Write(ctx, t); err != nil {
return err
}
}
return nil
}
func (s *MySource) Stop(ctx *core.Context) error {
<turn on a stop flag>
<wait until GenerateStream stops>
return nil
}
The following example source emits tuple periodically:
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
if atomic.LoadInt32(&t.stopped) != 0 {
break
}
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
The interval
field is initialized in SourceCreator
, which is described
later. This is the source version of the example in UDSF’s section. This
implementation is a little wrong since the Stop
method doesn’t wait until
the GenerateStream
method actually returns. Because implementing a
thread-safe source which stops correctly is a difficult task, core
package
provides a utility function that implements a source’s Stop
method on behalf
of the source itself. See Utilities for
details.
3.2.5.2. Registering a Source¶
To register a source to the SensorBee server, the source needs to provide its
SourceCreator
. The SourceCreator
interface is defined in
gopkg.in/sensorbee/sensorbee.v0/bql
pacakge as follows:
type SourceCreator interface {
CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error)
}
It only has one method: CreateSource
. The CreateSource
method is called
when the CREATE SOURCE statement is executed.
The ctx
argument contains the information of the current processing context.
ioParams
has the name and the type name of the source, which are given in
the CREATE SOURCE
statement. params
has parameters specified in the
WITH
clause of the CREATE SOURCE
statement.
The creator can be registered by RegisterGlobalSourceCreator
or
MustRegisterGlobalSourceCreator
function. As an example, the creator of
Ticker
above can be implemented and registered as follows:
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return &Ticker{
interval: interval,
}, nil
}
func init() {
bql.MustRegisterGlobalSourceCreator("ticker", &TickerCreator{})
}
In this example, the source has a parameter interval
which can be specified
in the WITH
clause of the CREATE SOURCE
statement:
CREATE SOURCE my_ticker TYPE ticker WITH interval = 0.1;
my_ticker
emits tuples that look like {"tick": 123}
in every 100ms.
Without the interval
parameter, my_ticker
will emit tuples in every one
second by default.
3.2.5.3. Types of a Source¶
In addition to a regular source, there’re two more types of sources: a resumable source and a rewindable source. This subsection describes those sources in detail.
Resumable Sources¶
A source that supports PAUSE SOURCE and the RESUME SOURCE statements are called a resumable source.
Although all sources support them by default, which is done by the core
package, a source can explicitly implement core.Resumable
interface so that
it can provide more efficient pause and resume capability:
type Resumable interface {
Pause(ctx *Context) error
Resume(ctx *Context) error
}
The Pause
method is called when PAUSE SOURCE
statement is executed and
the Resume
method is called by RESUME SOURCE
. The Pause
method may
be called even when the source is already paused, so is the Resume
method.
A source can be non-resumable by implementing these method to return an error:
type MyNonResumableSource struct {
...
}
...
func (m *MyNonResumableSource) Pause(ctx *core.Context) error {
return errors.New("my_non_resumable_source doesn't support pause")
}
func (m *MyNonResumableSource) Resume(ctx *core.Context) error {
return errors.New("my_non_resumable_source doesn't support resume")
}
Rewindable Sources¶
A rewindable source can re-generate the same tuples again from the beginning after it emits all tuples or while it’s emitting tuples. A rewindable source supports the REWIND SOURCE statement.
A source can become rewindable by implementing the core.RewindableSource
interface:
type RewindableSource interface {
Source
Resumable
Rewind(ctx *Context) error
}
A rewindable source also needs to implement core.Resumable
to be rewindable.
Note
The reason that a rewindable source also needs to be resumable is due to
the internal implementation of the default pause/resume support. While a
source is paused, it blocks core.Writer.Write
called in the
GenerateStream
method. The Rewind
method could also be blocked while
the Write
call is being blocked until the Resume
method is called.
It, of course, depends on the implementation of a source, but it’s very
error-prone. Therefore, implementing the Resumable
interface is required
to be rewindable at the moment.
Unlike a regular source, the GenerateStream
method of a rewindable source
must not return after it emits all tuples. Instead, it needs to wait until the
Rewind
method or the Stop
method is called. Once it returns, the source
is considered stopped and no further operation including the REWIND SOURCE
statement wouldn’t work on the source.
Due to its nature, a stream isn’t often resumable. A resumable source is
mostly used for relatively static data sources such as relations or files.
Also, because implementing the RewindableSource
interface is even harder
than implementing the Resumable
interface, utilities are usually used.
3.2.5.4. Utilities¶
There’re some utilities to support implementing sources and its creators. This subsection describes each utility.
core.ImplementSourceStop
¶
core.ImplementSourceStop
is a function that implements the Stop
method
of a source in a thread-safe manner:
func ImplementSourceStop(s Source) Source
A source returned from this function is resumable, but not rewindable even if
the original source implements the core.RewindableSource
interface. In
addition, although a source passed to core.ImplementSourceStop
can
explicitly implement the core.Resumable
interface, its Pause
and
Resume
method will never be called because the source returned from
core.ImplementSourceStop
also implements those methods and controls
pause and resume.
To apply this function, a source must satisfy following restrictions:
The
GenerateStream
method must be implemented in a way that it can safely be called again after it returns.The
GenerateStream
method must return when thecore.Writer.Write
returnedcore.ErrSourceStopped
. The method must return exactly the same error variable that the writer returned.The
Stop
method just returns nil.- This means all resource allocation and deallocation must be done within
the
GenerateStream
method.
- This means all resource allocation and deallocation must be done within
the
A typical implementation of a source passed to core.ImplementSourceStop
is
shown below:
func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
<initialization>
defer func() {
<clean up>
}()
for {
t := <create a new tuple>
if err := w.Write(ctx, t); err != nil {
return err
}
}
return nil
}
func (s *MySource) Stop(ctx *core.Context) error {
return nil
}
If a source wants to ignore errors returned from core.Writer.Write
other
than core.ErrSourceStopped
, the GenerateStream
method can be modified
as:
if err := w.Write(ctx, t); err != nil {
if err == core.ErrSourceStopped {
return err
}
}
By applying core.ImplementSourceStop
, the Ticker
above can be
implemented as follows:
type Ticker struct {
interval time.Duration
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
return nil
}
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&Ticker{
interval: interval,
}), nil
}
There’s no stopped
flag now. In this version, the Stop
method of the
source returned by core.ImplementSourceStop
waits until the
GenerateStream
method returns.
core.NewRewindableSource
¶
core.NewRewindableSource
is a function that converts a regular source into
a rewindable source:
func NewRewindableSource(s Source) RewindableSource
A source returned from this function is resumable and rewindable. A source
passed to the function needs to satisfy the same restrictions as
core.ImplementSourceStop
. In addition to that, there’s one more restriction
for core.NewRewindableSource
:
- The
GenerateStream
method must return when thecore.Writer.Write
returnedcore.ErrSourceRewound
. The method must return exactly the same error variable that the writer returned.
Although the GenerateStream
method of a rewindable source must not return
after it emits all tuples, a source passed to the core.NewRewindableSource
function needs to return in that situation. For example, let’s assume there’s a
source that generate tuples from each line in a file. To implement the source
without a help of the utility function, its GenerateStream
must wait for
the Rewind
method to be called after it processes all lines in the file.
However, with the utility, its GenerateStream
can just return once it emits
all tuples. Therefore, a typical implementation of a source passed to the
utility can be same as a source for core.ImplementSourceStop
.
As it will be shown later, a source that infinitely emits tuples can also be rewindable in some sense.
The following is an example of TickerCreator
modified from the example for
core.ImplementSourceStop
:
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
rewindable := false
if v, ok := params["rewindable"]; ok {
b, err := data.AsBool(v)
if err != nil {
return nil, err
}
rewindable = b
}
src := &Ticker{
interval: interval,
}
if rewindable {
return core.NewRewindableSource(src), nil
}
return core.ImplementSourceStop(src), nil
}
In this example, Ticker
has the rewindable
parameter. If it is true,
the source becomes rewindable:
CREATE SOURCE my_rewindable_ticker TYPE ticker WITH rewindable = true;
By issuing the REWIND SOURCE
statement, my_rewindable_ticker
resets
the value of tick
field:
REWIND SOURCE my_rewindable_ticker;
-- output examples of SELECT RSTREAM * FROM my_rewindable_ticker [RANGE 1 TUPLES];
{"tick":0}
{"tick":1}
{"tick":2}
...
{"tick":123}
-- REWIND SOURCE is executed here
{"tick":0}
{"tick":1}
...
bql.SourceCreatorFunc
¶
bql.SourceCreatorFunc
is a function that converts a function having the
same signature as SourceCreator.CreateSource
to a SourceCreator
:
func SourceCreatorFunc(f func(*core.Context,
*IOParams, data.Map) (core.Source, error)) SourceCreator
For example, TickerCreator
above and its registration can be modified to as
follows with this utility:
func CreateTicker(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&Ticker{
interval: interval,
}), nil
}
func init() {
bql.MustRegisterGlobalSourceCreator("ticker",
bql.SourceCreatorFunc(CreateTicker))
}
3.2.5.5. A Complete Example¶
A complete example of Ticker
is shown in this subsection. Assume that the
import path of the example is github.com/sensorbee/examples/ticker
, which
doesn’t actually exist. There’re two files in the repository:
- ticker.go
- plugin/plugin.go
The example uses core.NewRewindableSource
utility function.
ticker.go¶
package ticker
import (
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Ticker struct {
interval time.Duration
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
// This method will be implemented by utility functions.
return nil
}
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
rewindable := false
if v, ok := params["rewindable"]; ok {
b, err := data.AsBool(v)
if err != nil {
return nil, err
}
rewindable = b
}
src := &Ticker{
interval: interval,
}
if rewindable {
return core.NewRewindableSource(src), nil
}
return core.ImplementSourceStop(src), nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"github.com/sensorbee/examples/ticker"
)
func init() {
bql.MustRegisterGlobalSourceCreator("ticker", &ticker.TickerCreator{})
}
3.2.6. Sink Plugins¶
This section describes how to implement a sink as a plugin of SensorBee.
3.2.6.1. Implementing a Sink¶
A struct implementing the following interface can be a sink:
type Sink interface {
Write(ctx *Context, t *Tuple) error
Close(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core
package.
The Write
method write a tuple to a destination of the sink. The argument
ctx
contains the information of the current processing context. t
is the
tuple to be written. The Close
method is called when the sink becomes
unnecessary. It must release all resources allocated for the sink.
The following example sink write a tuple as a JSON to stdout:
type StdoutSink struct {
}
func (s *StdoutSink) Write(ctx *core.Context, t *core.Tuple) error {
_, err := fmt.Fprintln(os.Stdout, t.Data)
return err
}
func (s *StdoutSink) Close(ctx *core.Context) error {
// nothing to release
return nil
}
A sink is initialized by its SinkCreator
, which is described later.
Note
SensorBee doesn’t provide buffering or retry capability for sinks.
3.2.6.2. Registering a Sink¶
To register a sink to the SensorBee server, the sink needs to provide its
SinkCreator
. The SinkCreator
interface is defined in
gopkg.in/sensorbee/sensorbee.v0/bql
pacakge as follows:
// SinkCreator is an interface which creates instances of a Sink.
type SinkCreator interface {
// CreateSink creates a new Sink instance using given parameters.
CreateSink(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Sink, error)
}
It only has one method: CreateSink
. The CreateSink
method is called
when the CREATE SINK statement is executed.
The ctx
argument contains the information of the current processing context.
ioParams
has the name and the type name of the sink, which are given in
the CREATE SINK
statement. params
has parameters specified in the
WITH
clause of the CREATE SINK
statement.
The creator can be registered by RegisterGlobalSinkCreator
or
MustRegisterGlobalSinkCreator
function. As an example, the creator of
StdoutSink
above can be implemented and registered as follows:
type StdoutSinkCreator struct {
}
func (s *StdoutSinkCreator) CreateSink(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &StdoutSink{}, nil
}
func init() {
bql.MustRegisterGlobalSinkCreator("my_stdout", &StdoutSinkCreator{})
}
This sink doesn’t have parameters specified in the WITH
clause of the
CREATE SINK
statement. How to handle parameters for sink is same as how
source does. See Source Plugins for more details.
3.2.6.3. Utilities¶
There’s one utility function for sink plugins: SinkCreatorFunc
:
func SinkCreatorFunc(f func(*core.Context,
*IOParams, data.Map) (core.Sink, error)) SinkCreator
This utility function is defined in gopkg.in/sensorbee/sensorbee.v0/bql
pacakge. It converts a function having the same signature as
SinkCreator.CreateSink
to a SinkCreator
. With this utility, for example,
StdoutSinkCreator
can be modified to:
func CreateStdoutSink(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &StdoutSink{}, nil
}
fucn init() {
bql.MustRegisterGlobalSinkCreator("stdout",
bql.SinkCreatorFunc(CreateStdoutSink))
}
3.2.6.4. A Complete Example¶
A complete example of the sink is shown in this subsection. The package name for
the sink is stdout
and StdoutSink
is renamed to Sink
. Also, this
example uses SinkCreatorFunc
utility for SinkCreator
.
Assume that the import path of the example is
github.com/sensorbee/examples/stdout
, which doesn’t actually exist. The
repository has to files:
- stdout.go
- plugin/plugin.go
stdout.go¶
package stdout
import (
"fmt"
"os"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Sink struct {
}
func (s *Sink) Write(ctx *core.Context, t *core.Tuple) error {
_, err := fmt.Fprintln(os.Stdout, t.Data)
return err
}
func (s *Sink) Close(ctx *core.Context) error {
return nil
}
func Create(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &Sink{}, nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"github.com/sensorbee/examples/stdout"
)
func init() {
bql.MustRegisterGlobalSinkCreator("my_stdout",
bql.SinkCreatorFunc(stdout.Create))
}