3. Server Programming¶
This part describes the extensibility of the SensorBee server. Topics covered in this part are advanced and should be read after understanding the basics of SensorBee and BQL.
Because SensorBee is mainly written in
The Go Programming Language, understanding the language
before reading this part is also recommended.
A Tour of Go is an official tutorial and is one
of the best tutorials of the language. It runs on web browsers and does not
require any additional software installation. After learning the language,
How to Write Go Code helps to understand
how to use the go tool and the standard way to develop Go packages and
applications.
This part assumes that the go tool is installed and the development environment
including Go’s environment variables like GOROOT or GOPATH is
appropriately set up. SensorBee requires Go 1.4 or later.
3.1. Extending the SensorBee Server and BQL¶
Many features of the server and BQL can be extended by plugins. This chapter describes what types of features can be extended by users. Following chapters in this part describes how to develop those features in specific programming languages.
3.1.1. User-Defined Functions¶
A user-defined function (UDF) is a function that is implemented by a user and registered in the SensorBee server. Once it is registered, it be called from BQL statements:
SELECT RSTREAM my_udf(field) FROM src [RANGE 1 TUPLES];
A UDF behaves just like a built-in function. A UDF can take an arbitrary number
of arguments. Each argument can be any of built-in types
and can receive multiple types of values. A UDF can also support a variable-length
argument. A UDF has a single return value of any built-in type. When multiple return
values are required, a UDF can return the value as an array.
Note
BQL currently does not support CREATE FUNCTION statements like well-known
RDBMSs. UDFs can only be added through Go programs.
3.1.2. User-Defined Aggregate Functions¶
A user-defined aggregate function (UDAF) is a function similar to a UDF but can take aggregation parameters (see Aggregate Expressions) as arguments in addition to regular arguments.
3.1.3. User-Defined Stream-Generating Functions¶
Stream-generating functions can also be user-defined. There are two types of UDSFs. The first type behaves like a source, which is not connected to any input stream and generates tuples proactively:
... FROM my_counter() [RANGE ...
my_counter above may emit tuples like {"count": 1}.
This type of UDSFs are called source-like UDSFs.
The other type is called a stream-like UDSF and behaves just like a stream, which receives tuples from one or more incoming streams or sources. It receives names of streams or sources as its arguments:
... FROM my_udsf("another_stream", "yet_another_stream", other_params) [RANGE ...
Note that there is no rule on how to define UDFS’s arguments. Thus, the order and
the use of arguments depend on each UDFS. For example, a UDFS might take an
array of string containing names of input streams as its first argument:
my_union(["stream1", "stream2", "stream3"]). Names of input stream do not
even need to be located at the beginning of the argument list:
my_udfs2(1, "another_stream") is also possible.
Using UDSFs is a very powerful way to extend BQL since they can potentially do
anything that the SELECT cannot do.
3.1.4. User-Defined States¶
A user-defined state (UDS) can be provided to support stateful data processing
(see Stateful Data Processing). A UDS is usually provided with a set of UDFs that
manipulate the state. Those UDFs take the name of the UDS as a string
argument:
CREATE STATE event_id_seq TYPE snowflake_id WITH machine_id = 1;
CREATE STREAM events_with_id AS
SELECT snowflake_id("event_id_seq"), * FROM events [RANGE 1 TUPLES];
In the example above, a UDS event_id_seq is created with the type
snowflake_id. Then, the UDS is passed to the UDF snowflake_id, which
happens to have the same name as the type name of the UDS. The UDF looks up
the UDS event_id_seq and returns a value computed based on the state.
3.1.5. Source Plugins¶
A source type developed by a user can be added to the SensorBee server as a plugin
so that it can be used in CREATE SOURCE statements. A source type can have any
number of required and optional parameters. Each parameter can have any of
built-in types.
3.1.6. Sink Plugins¶
A sink type developed by a user can be added to the SensorBee server as a plugin
so that it can be used in CREATE SINK statement. A sink type can have any
number of required and optional parameters. Each parameter can have any of
built-in types.
3.2. Extensions in Go¶
This chapter describes how to extend the SensorBee server in the Go programming language.
3.2.1. Development Flow of Components in Go¶
The typical development flow of components like a UDF, a UDS type, a source type, or a sink type should be discussed before looking into details of each component.
The basic flow is as follows:
- Create a git repository for components
- Implement components
- Create a plugin subpackage in the repository
3.2.1.1. Create a Git Repository for Components¶
Components are written in Go, so they need to be in a valid git repository (or a repository of a different version control system). One repository may provide multiple types of components. For example, a repository could have 10 UDFs, and 5 UDS types, 2 source types, and 1 sink type. However, since Go is very well designed to provide packages in a fine-grained manner, each repository should only provide a minimum set of components that are logically related and make sense to be in the same repository.
3.2.1.2. Implement Components¶
The next step is to implement components. There is no restriction on which standard or 3rd party packages to depend on.
Functions or structs that are to be registered to the SensorBee server need to be referred to by the plugin subpackage, which is described in the next subsection. Thus, names of those symbols need to start with a capital letter.
In this step, components should not be registered to the SensorBee server yet.
3.2.1.3. Create a Plugin Subpackage in the Repository¶
It is highly recommended that the repository has a separate package (i.e. a
subdirectory) which only registers components to the SensorBee server. There is
usually one file named “plugin.go” in the plugin package and it only contains a
series of registration function calls in init function. For instance, if the
repository only provides one UDF, the contents of plugin.go would look
like:
// in github.com/user/myudf/plugin/plugin.go
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/user/myudf"
)
func init() {
udf.MustRegisterGlobalUDF("my_udf", &myudf.MyUDF{})
}
There are two reasons to have a plugin subpackage separated from the implementation of components. First, by separating them, other Go packages can import the components to use the package as a library without registering them to SensorBee. Second, having a separated plugin package allows a user to register a component with a different name. This is especially useful when names of components conflict each other.
To use the example plugin above, the github.com/user/myudf/plugin package needs
to be added to the plugin path list of SensorBee.
3.2.1.4. Repository Organization¶
The typical organization of the repository is
github.com/user/repoREADME: description and the usage of components in the repository.gofiles: implementation of componentsplugin/: a subpackage for the plugin registrationplugin.go
othersubpackages/: there can be optional subpackages
3.2.2. User-Defined Functions¶
This section describes how to write a UDF in Go. It first shows the basic interface of defining UDFs, and then describes utilities around it, how to develop a UDF in a Go-ish manner, and a complete example.
3.2.2.1. Implementing a UDF¶
Note
This is a very low-level way to implement a UDF in Go. To learn about an easier way, see Generic UDFs.
Any struct implementing the following interface can be used as a UDF:
type UDF interface {
// Call calls the UDF.
Call(*core.Context, ...data.Value) (data.Value, error)
// Accept checks if the function accepts the given number of arguments
// excluding core.Context.
Accept(arity int) bool
// IsAggregationParameter returns true if the k-th parameter expects
// aggregated values. A UDF with Accept(n) == true is an aggregate
// function if and only if this function returns true for one or more
// values of k in the range 0, ..., n-1.
IsAggregationParameter(k int) bool
}
This interface is defined in the gopkg.in/sensorbee/sensorbee.v0/bql/udf package.
A UDF can be registered via the RegisterGlobalUDF or MustRegisterGlobalUDF
functions from the same package. MustRegisterGlobalUDF is the same as
RegisterGlobalUDF but panics on failure instead of returning an error. These
functions are usually called from the init function in the UDF package’s
plugin subpackage. A typical implementation of a UDF looks as follows:
type MyUDF struct {
...
}
func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
...
}
func (m *MyUDF) Accept(arity int) bool {
...
}
func (m *MyUDF) IsAggregationParameter(k int) bool {
...
}
func init() {
// MyUDF can be used as my_udf in BQL statements.
udf.MustRegisterGlobalUDF("my_udf", &MyUDF{})
}
As it can be inferred from this example, a UDF itself should be stateless since it only registers one instance of a struct as a UDF and it will be shared globally. Stateful data processing can be achieved by the combination of UDFs and UDSs, which is described in User-Defined States.
A UDF needs to implement three methods to satisfy udf.UDF interface:
Call, Accept, and IsAggregationParameter.
The Call method receives a *core.Context and multiple data.Value as its
arguments. *core.Context contains the information of the current processing
context. Call‘s ...data.Value argument holds the values passed to the UDF.
data.Value represents a value used in BQL and can be any of built-in
types.
SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];
In this example, arg1 and arg2 are passed to the Call method:
func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
// When my_udf(arg1, arg2) is called, len(args) is 2.
// args[0] is arg1 and args[1] is arg2.
// It is guaranteed that m.Accept(len(args)) is always true.
}
Because data.Value is a semi-variant type, the Call method needs to check
the type of each data.Value and convert it to a desired type.
The Accept method verifies if the UDF accepts the specific number of arguments.
It can return true for multiple arities as long as it can receive the given
number of arguments. If a UDF only accepts two arguments, the method is implemented
as follows:
func (m *MyUDF) Accept(arity int) bool {
return arity == 2
}
When a UDF aims to support variadic parameters (a.k.a. variable-length
arguments) with two required arguments (e.g.
my_udf(arg1, arg2, optional1, optional2, ...)), the implementation would be:
func (m *MyUDF) Accept(arity int) bool {
return arity >= 2
}
Finally, IsAggregationParameter returns whether the k-th argument (starting
from 0) is an aggregation parameter. Aggregation parameters are passed as a
data.Array containing all values of a field in each group.
All of these methods can be called concurrently from multiple goroutines and they must be thread-safe.
The registered UDF is looked up based on its name and the number of argument passed to it.
SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];
In this SELECT, a UDF having the name my_udf is looked up first. After
that, its Accept method is called with 2 and my_udf is actually selected
if Accept(2) returned true. IsAggregationParameter method is
additionally called on each argument to see if the argument needs to be an
aggregation parameter. Then, if there is no mismatch, my_udf is finally
called.
Note
A UDF does not have a schema at the moment, so any error regarding types of arguments will not be reported until the statement calling the UDF actually processes a tuple.
3.2.2.2. Generic UDFs¶
SensorBee provides a helper function to register a regular Go function as a UDF
without implementing the UDF interface explicitly.
func Inc(v int) int {
return v + 1
}
This function Inc can be transformed into a UDF by ConvertGeneric
or MustConvertGeneric function defined in the
gopkg.in/sensorbee/sensorbee.v0/bql/udf package. By combining it with
RegisterGlobalUDF, the Inc function can easily be registered as a UDF:
func init() {
udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}
So, a complete example of the UDF implementation and registration is as follows:
package inc
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
)
func Inc(v int) int {
return v + 1
}
func init() {
udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}
Note
A UDF implementation and registration should actually be separated to different packages. See Development Flow of Components in Go for details.
Although this approach is handy, there is some small overhead compared to a UDF implemented in the regular way. Most of such overhead comes from type checking and conversions.
Functions passed to ConvertGeneric need to satisfy some restrictions on
the form of their argument and return value types. Each restriction is described
in the following subsections.
Form of Arguments¶
In terms of valid argument forms, there are some rules to follow:
- A Go function can receive
*core.Contextas the first argument, or can omit it. - A function can have any number of arguments including 0 arguments as long as Go accepts them.
- A function can be variadic with or without non-variadic parameters.
There are basically eight (four times two, whether a function has
*core.Context or not) forms of arguments (return values are
intentionally omitted for clarity):
Functions receiving no argument in BQL (e.g.
my_udf())func(*core.Context): A function only receiving*core.Contextfunc(): A function having no argument and not receiving*core.Context, either
Functions having non-variadic arguments but no variadic arguments
func(*core.Context, T1, T2, ..., Tn)func(T1, T2, ..., Tn)
Functions having variadic arguments but no non-variadic arguments
func(*core.Context, ...T)func(...T)
Functions having both variadic and non-variadic arguments
func(*core.Context, T1, T2, ..., Tn, ...Tn+1)func(T1, T2, ..., Tn, ...Tn+1)
Here are some examples of invalid function signatures:
func(T, *core.Context):*core.Contextmust be the first argument.func(NonSupportedType): Only supported types, which will be explained later, can be used.
Although return values are omitted from all the examples above, they are actually required. The next subsection explains how to define valid return values.
Form of Return Values¶
All functions need to have return values. There are two forms of return values:
func(...) Rfunc(...) (R, error)
All other forms are invalid:
func(...)func(...) errorfunc(...) NonSupportedType
Valid types of return values are same as the valid types of arguments, and they are listed in the following subsection.
Valid Value Types¶
The list of Go types that can be used for arguments and the return value is as follows:
bool- signed integers:
int,int8,int16,int32,int64 - unsigned integers:
uint,uint8,uint16,uint32,uint64 float32,float64stringtime.Time- data:
data.Bool,data.Int,data.Float,data.String,data.Blob,data.Timestamp,data.Array,data.Map,data.Value - A slice of any type above, including
data.Value
data.Value can be used as a semi-variant type, which will receive all types
above.
When the argument type and the actual value type are different, weak type
conversion are applied to values. Conversions are basically done by
data.ToXXX functions (see godoc comments of each function in
data/type_conversions.go). For example, func inc(i int) int can be called by
inc("3") in a BQL statement and it will return 4. If a strict type checking
or custom type conversion is required, receive values as data.Value and
manually check or convert types, or define the UDF in the regular way.
Examples of Valid Go Functions¶
The following functions can be converted to UDFs by ConvertGeneric or
MustConvertGeneric function:
func rand() intfunc pow(*core.Context, float32, float32) (float32, error)func join(*core.Context, ...string) stringfunc format(string, ...data.Value) (string, error)func keys(data.Map) []string
3.2.2.3. Complete Examples¶
This subsection shows three example UDFs:
my_incmy_joinmy_join2
Assume that these are in the repository github.com/sensorbee/examples/udfs
(which actually does not exist). The repository has three files:
inc.gojoin.goplugin/plugin.go
inc.go¶
In inc.go, the Inc function is defined as a pure Go function with a standard
value type:
package udfs
func Inc(v int) int {
return v + 1
}
join.go¶
In join.go, the Join UDF is defined in a strict way. It also performs
strict type checking. It is designed to be called in one of two forms:
my_join("a", "b", "c", "separator") or
my_join(["a", "b", "c"], "separator"). Each argument and value in the array
must be a string. The UDF receives an arbitrary number of arguments.
package udfs
import (
"errors"
"strings"
"pfi/sensorbee/sensorbee/core"
"pfi/sensorbee/sensorbee/data"
)
type Join struct {
}
func (j *Join) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
empty := data.String("")
if len(args) == 1 {
return empty, nil
}
switch args[0].Type() {
case data.TypeString: // my_join("a", "b", "c", "sep") form
var ss []string
for _, v := range args {
s, err := data.AsString(v)
if err != nil {
return empty, err
}
ss = append(ss, s)
}
return data.String(strings.Join(ss[:len(ss)-1], ss[len(ss)-1])), nil
case data.TypeArray: // my_join(["a", "b", "c"], "sep") form
if len(args) != 2 {
return empty, errors.New("wrong number of arguments for my_join(array, sep)")
}
sep, err := data.AsString(args[1])
if err != nil {
return empty, err
}
a, _ := data.AsArray(args[0])
var ss []string
for _, v := range a {
s, err := data.AsString(v)
if err != nil {
return empty, err
}
ss = append(ss, s)
}
return data.String(strings.Join(ss, sep)), nil
default:
return empty, errors.New("the first argument must be a string or an array")
}
}
func (j *Join) Accept(arity int) bool {
return arity >= 1
}
func (j *Join) IsAggregationParameter(k int) bool {
return false
}
plugin/plugin.go¶
In addition to Inc and Join, this file registers the standard Go
function strings.Join as my_join2. Because it’s converted to a UDF by
udf.MustConvertGeneric, arguments are weakly converted to given types.
For example, my_join2([1, 2.3, "4"], "-") is valid although strings.Join
itself is func([]string, string) string.
package plugin
import (
"strings"
"pfi/sensorbee/sensorbee/bql/udf"
"pfi/nobu/docexamples/udfs"
)
func init() {
udf.MustRegisterGlobalUDF("my_inc", udf.MustConvertGeneric(udfs.Inc))
udf.MustRegisterGlobalUDF("my_join", &udfs.Join{})
udf.MustRegisterGlobalUDF("my_join2", udf.MustConvertGeneric(strings.Join))
}
Evaluating Examples¶
Once the sensorbee command is built with those UDFs and a topology is
created on the server, the EVAL statement can be used to test them:
EVAL my_inc(1); -- => 2
EVAL my_inc(1.5); -- => 2
EVAL my_inc("10"); -- => 11
EVAL my_join("a", "b", "c", "-"); -- => "a-b-c"
EVAL my_join(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join(1, "b", "c", "-") -- => error
EVAL my_join([1, "b", "c"], ",") -- => error
EVAL my_join2(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join2([1, "b", "c"], ",") -- => "1,b,c"
3.2.2.4. Dynamic Loading¶
Dynamic loading of UDFs written in Go is not supported at the moment because Go does not support loading packages dynamically.
3.2.3. User-Defined Stream-Generating Functions¶
This section describes how to write a UDSF in Go.
3.2.3.1. Implementing a UDSF¶
To provide a UDSF, two interfaces need to be implemented: UDSF and
UDSFCreator.
The interface UDSF is defined as follows in the
gopkg.in/sensorbee/sensorbee.v0/bql/udf package.
type UDSF interface {
Process(ctx *core.Context, t *core.Tuple, w core.Writer) error
Terminate(ctx *core.Context) error
}
The Process method processes an input tuple and emits computed tuples for
subsequent streams. ctx contains the processing context information. t
is the tuple to be processed in the UDSF. w is the destination to where
computed tuples are emitted. The Terminate method is called when the UDFS
becomes unnecessary. The method has to release all the resources the UDSF has.
How the Process method is called depends on the type of a UDSF. When a UDFS
is a stream-like UDSF (i.e. it has input from other streams), the Process
method is called every time a new tuple arrives. The argument t contains the
tuple emitted from another stream. Stream-like UDSFs have to return from
Process immediately after processing the input tuple. They must not
block in the method.
A stream-like UDSF is used mostly when multiple tuples need to be computed and emitted based on one input tuple:
type WordSplitter struct {
field string
}
func (w *WordSplitter) Process(ctx *core.Context, t *core.Tuple, writer core.Writer) error {
var kwd []string
if v, ok := t.Data[w.field]; !ok {
return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
} else if s, err := data.AsString(v); err != nil {
return fmt.Errorf("'%v' field must be string: %v", w.field, err)
} else {
kwd = strings.Split(s, " ")
}
for _, k := range kwd {
out := t.Copy()
out.Data[w.field] = data.String(k)
if err := writer.Write(ctx, out); err != nil {
return err
}
}
return nil
}
func (w *WordSplitter) Terminate(ctx *core.Context) error {
return nil
}
WordSplitter splits text in a specific field by space. For example, when
an input tuple is {"word": "a b c"} and WordSplitter.Field is word,
following three tuples will be emitted: {"word": "a"}, {"word": "b"},
and {"word": "c"}.
When a UDSF is a source-like UDSF, the Process method is only called once
with a tuple that does not mean anything. Unlike a stream-like UDSF, the
Process method of a source-like UDSF does not have to return until it has
emitted all tuples, the Terminate method is called, or a fatal error occurs.
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
var i int64
for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
if err := w.Write(ctx, newTuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Terminate(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
In this example, Ticker emits tuples having tick field containing
a counter until the Terminate method is called.
Whether a UDSF is stream-like or source-like can be configured when it is
created by UDSFCreator. The interface UDSFCreator is defined as follows
in gopkg.in/sensorbee/sensorbee.v0/bql/udf package:
type UDSFCreator interface {
CreateUDSF(ctx *core.Context, decl UDSFDeclarer, args ...data.Value) (UDSF, error)
Accept(arity int) bool
}
The CreateUDSF method creates a new instance of a UDSF. The method is called
when evaluating a UDSF in the FROM clause of a SELECT statement. ctx
contains the processing context information. decl is used to customize the
behavior of the UDSF, which is explained later. args has arguments passed in the
SELECT statement. The Accept method verifies if the UDSF accept the
specific number of arguments. This is the same as UDF.Arity method (see
User-Defined Functions).
UDSFDeclarer is used in the CreateUDSF method to customize the
behavior of a UDSF:
type UDSFDeclarer interface {
Input(name string, config *UDSFInputConfig) error
ListInputs() map[string]*UDSFInputConfig
}
By calling its Input method, a UDSF will be able to receive tuples from
another stream with the name name. Because the name is given outside the
UDSF, it’s uncontrollable from the UDSF. However, there are cases that a UDSF
wants to know from which stream a tuple has come. For example, when providing
a UDSF performing a JOIN or two streams, a UDSF needs to distinguish which
stream emitted the tuple. If the UDSF was defined as
my_join(left_stream, right_stream), decl can be used as follows in
UDSFCreator.CreateUDSF:
decl.Input(args[0], &UDSFInputConfig{InputName: "left"})
decl.Input(args[1], &UDSFInputConfig{InputName: "right"})
By configuring the input stream in this way, a tuple passed to UDSF.Process has
the given name in its Tuple.InputName field:
func (m *MyJoin) Process(ctx *core.Context, t *core.Tuple, w core.Writer) error {
switch t.InputName {
case "left":
... process tuples from left_stream ...
case "right":
... process tuples from right_stream ...
}
...
}
If a UDSF is configured to have one or more input streams by decl.Input in
the UDSFCreator.CreateUDSF method, the UDSF is processed as a stream-like
UDSF. Otherwise, if a UDSF doesn’t have any input (i.e. decl.Input is not
called), the UDSF becomes a source-like UDSF.
As an example, the UDSFCreator of WordSplitter is shown below:
type WordSplitterCreator struct {
}
func (w *WordSplitterCreator) CreateUDSF(ctx *core.Context,
decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
input, err := data.AsString(args[0])
if err != nil {
return nil, fmt.Errorf("input stream name must be a string: %v", args[0])
}
field, err := data.AsString(args[1])
if err != nil {
return nil, fmt.Errorf("target field name must be a string: %v", args[1])
}
// This Input call makes the UDSF a stream-like UDSF.
if err := decl.Input(input, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
func (w *WordSplitterCreator) Accept(arity int) bool {
return arity == 2
}
Although the UDSF has not been registered to the SensorBee server yet, it could
appear like word_splitter(input_stream_name, target_field_name) if it was
registered with the name word_splitter.
For another example, the UDSFCreator of Ticker is shown below:
type TickerCreator struct {
}
func (t *TickerCreator) CreateUDSF(ctx *core.Context,
decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(args[0])
if err != nil {
return nil, err
}
// Since this is a source-like UDSF, there's no input.
return &Ticker{
interval: interval,
}, nil
}
func (t *TickerCreator) Accept(arity int) bool {
return arity == 1
}
Like word_splitter, its signature could be ticker(interval) if the UDSF
is registered as ticker.
The implementation of this UDSF is completed and the next step is to register it to the SensorBee server.
3.2.3.2. Registering a UDSF¶
A UDSF can be used in BQL by registering its UDSFCreator interface to
the SensorBee server using the RegisterGlobalUDSFCreator or
MustRegisterGlobalUDSFCreator functions, which are defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf.
The following example registers WordSplitter and Ticker:
func init() {
udf.RegisterGlobalUDSFCreator("word_splitter", &WordSplitterCreator{})
udf.RegisterGlobalUDSFCreator("ticker", &TickerCreator{})
}
3.2.3.3. Generic UDSFs¶
Just like UDFs have a ConvertGeneric function, UDSFs also have
ConvertToUDSFCreator and MustConvertToUDSFCreator function. They convert
a regular function satisfying some restrictions to the UDSFCreator interface.
The restrictions are the same as for
generic UDFs except that a
function converted to the UDSFCreator interface has an additional argument
UDSFDeclarer. UDSFDeclarer is located after *core.Context and before
other arguments. Examples of valid function signatures are show below:
func(*core.Context, UDSFDeclarer, int)func(UDSFDeclarer, string)func(UDSFDeclarer)func(*core.Context, UDSFDeclarer, ...data.Value)func(UDSFDeclarer, ...float64)func(*core.Context, UDSFDeclarer, int, ...string)func(UDSFDeclarer, int, float64, ...time.Time)
Unlike *core.Context, UDSFDeclarer cannot be omitted. The same set of
types can be used for arguments as types that ConvertGeneric function
accepts.
WordSplitterCreator can be rewritten with the ConvertToUDSFCreator
function as follows:
func CreateWordSplitter(decl udf.UDSFDeclarer,
inputStream, field string) (udf.UDSF, error) {
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
func init() {
udf.RegisterGlobalUDSFCreator("word_splitter",
udf.MustConvertToUDSFCreator(WordSplitterCreator))
}
TickerCreator can be replaced with ConvertToUDSFCreator, too:
func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(i)
if err != nil {
return nil, err
}
return &Ticker{
interval: interval,
}, nil
}
func init() {
udf.MustRegisterGlobalUDSFCreator("ticker",
udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
3.2.3.4. A Complete Example¶
This subsection provides a complete example of UDSFs described in this section.
In addition to word_splitter and ticker, the example also includes the
lorem source, which periodically emits random texts as
{"text": "lorem ipsum dolor sit amet"}.
Assume that the import path of the example repository is
github.com/sensorbee/examples/udsfs, which doesn’t actually exist. The
repository has four files:
lorem.gosplitter.goticker.goplugin/plugin.go
lorem.go¶
To learn how to implement a source plugin, see Source Plugins.
package udsfs
import (
"math/rand"
"strings"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
var (
Lorem = strings.Split(strings.Replace(`lorem ipsum dolor sit amet
consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore
magna aliqua Ut enim ad minim veniam quis nostrud exercitation ullamco laboris
nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in reprehenderit
in voluptate velit esse cillum dolore eu fugiat nulla pariatur Excepteur sint
occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim
id est laborum`, "\n", " ", -1), " ")
)
type LoremSource struct {
interval time.Duration
}
func (l *LoremSource) GenerateStream(ctx *core.Context, w core.Writer) error {
for {
var text []string
for l := rand.Intn(5) + 5; l > 0; l-- {
text = append(text, Lorem[rand.Intn(len(Lorem))])
}
t := core.NewTuple(data.Map{
"text": data.String(strings.Join(text, " ")),
})
if err := w.Write(ctx, t); err != nil {
return err
}
time.Sleep(l.interval)
}
}
func (l *LoremSource) Stop(ctx *core.Context) error {
return nil
}
func CreateLoremSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&LoremSource{
interval: interval,
}), nil
}
splitter.go¶
package udsfs
import (
"fmt"
"strings"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type WordSplitter struct {
field string
}
func (w *WordSplitter) Process(ctx *core.Context,
t *core.Tuple, writer core.Writer) error {
var kwd []string
if v, ok := t.Data[w.field]; !ok {
return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
} else if s, err := data.AsString(v); err != nil {
return fmt.Errorf("'%v' field must be string: %v", w.field, err)
} else {
kwd = strings.Split(s, " ")
}
for _, k := range kwd {
out := t.Copy()
out.Data[w.field] = data.String(k)
if err := writer.Write(ctx, out); err != nil {
return err
}
}
return nil
}
func (w *WordSplitter) Terminate(ctx *core.Context) error {
return nil
}
func CreateWordSplitter(decl udf.UDSFDeclarer,
inputStream, field string) (udf.UDSF, error) {
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
ticker.go¶
package udsfs
import (
"sync/atomic"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
var i int64
for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
if err := w.Write(ctx, newTuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Terminate(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(i)
if err != nil {
return nil, err
}
return &Ticker{
interval: interval,
}, nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/sensorbee/examples/udsfs"
)
func init() {
bql.MustRegisterGlobalSourceCreator("lorem",
bql.SourceCreatorFunc(udsfs.CreateLoremSource))
udf.MustRegisterGlobalUDSFCreator("word_splitter",
udf.MustConvertToUDSFCreator(udsfs.CreateWordSplitter))
udf.MustRegisterGlobalUDSFCreator("ticker",
udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
Example BQL Statements¶
CREATE SOURCE lorem TYPE lorem;
CREATE STREAM lorem_words AS
SELECT RSTREAM * FROM word_splitter("lorem", "text") [RANGE 1 TUPLES];
Results of word_splitter can be received by the following SELECT:
SELECT RSTREAM * FROM lorem_words [RANGE 1 TUPLES];
3.2.4. User-Defined States¶
This section describes how to write a UDS in Go.
3.2.4.1. Implementing a UDS¶
A struct implementing the following interface can be used as a UDS:
type SharedState interface {
Terminate(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core package.
Terminate method is called when the UDS becomes no longer in use. It should
release any resource that the UDS has allocated so far.
As an example, a UDS having a monotonically increasing counter can be implemented as follows:
type Counter struct {
c int64
}
func (c *Counter) Terminate(ctx *core.Context) error {
return nil
}
func (c *Counter) Next() int64 {
return atomic.AddInt64(&c.c, 1)
}
At the moment, there’s no way to manipulate the UDS from BQL statements. UDSs are usually provided with a set of UDFs that read or update the UDS. It’ll be described later in Manipulating a UDS via a UDF. Before looking into the UDS manipulation, registering and creating a UDS needs to be explained.
3.2.4.2. Registering a UDS¶
To register a UDS to the SensorBee server , the UDS needs to provide its
UDSCreator. UDSCreator is an interface defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf package as follows:
type UDSCreator interface {
CreateState(ctx *core.Context, params data.Map) (core.SharedState, error)
}
UDSCreator.CreateState method is called when executing a CREATE STATE
statement. The method creates a new instance of the UDS and initializes it with
the given parameters. The argument ctx has the processing context
information and params has parameters specified in the WITH clause of
the CREATE STATE.
The creator can be registered by RegisterGlobalUDSCreator or
MustRegisterGlobalUDSCreator function defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf package.
The following is the implementation and the registration of the creator for
Counter UDS above:
type CounterCreator struct {
}
func (c *CounterCreator) CreateState(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter", &CounterCreator{})
}
The creator in this example is registered with the UDS type name my_counter.
The creator supports start parameter which is used as the first value that
Counter.Next returns. The parameter can be specified in the CREATE STATE
as follows:
CREATE STATE my_counter_instance TYPE my_counter WITH start = 100;
Because the creator creates a new instance every time the CREATE STATE is
executed, there can be multiple instances of a specific UDS type:
CREATE STATE my_counter_instance1 TYPE my_counter;
CREATE STATE my_counter_instance2 TYPE my_counter;
CREATE STATE my_counter_instance3 TYPE my_counter;
...
Once an instance of the UDS is created by the CREATE STATE, UDFs can refer
them and manipulate their state.
udf.UDSCreatorFunc¶
A function having the same signature as UDSCreator.CreateState can be
converted into UDSCreator by by udf.UDSCreatorFunc utility function:
func UDSCreatorFunc(f func(*core.Context, data.Map) (core.SharedState, error)) UDSCreator
For example, CounterCreator can be defined as a function and registered as
follows with this utility:
func CreateCounter(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter",
&udf.UDSCreatorFunc(CreateCounter))
}
To support SAVE STATE and LOAD STATE statements, however, this utility
function cannot be used because the creator needs to have the LoadState
method. How to support saving and loading is described later.
3.2.4.3. Manipulating a UDS via a UDF¶
To manipulate a UDS from BQL statements, a set of UDFs that read or update the UDS has to be provided with it:
func Next(ctx *core.Context, uds string) (int64, error) {
s, err := ctx.SharedStates.Get(uds)
if err != nil {
return 0, err
}
c, ok := s.(*Counter)
if !ok {
return 0, fmt.Errorf("the state isn't a counter: %v", uds)
}
return c.Next(), nil
}
func init() {
udf.MustRegisterGlobalUDF("my_next_count", udf.MustConvertGeneric(Next))
}
In this example, a UDF my_next_count is registered to the SensorBee server.
The UDF calls Counter.Next method to obtain the next count and returns it.
The UDF receives one argument uds that is the name of the UDS to be updated.
CREATE STATE my_counter_instance TYPE my_counter;
CREATE STREAM events_with_id AS
SELECT RSTREAM my_next_count("my_counter_instance") AS id, *
FROM events [RANGE 1 TUPLES];
The BQL statements above add IDs to tuples emitted from a stream events. The
state my_counter_instance is created with the type my_counter. Then,
my_next_count UDF is called with the name. Every time the UDF is called,
the state of my_counter_instance is updated by its Next method.
my_next_count (i.e. Next function in Go) can look up the instance of
the UDS by its name through core.Context.SharedStates. SharedStates
manages all the UDSs created in a topology. SharedState.Get returns the
instance of the UDS having the given name. It returns an error if it couldn’t
find the instance. In the example above, my_next_count("my_counter_instance")
will look up an instance of the UDS having the name my_counter_instance,
which was previously created by the CREATE STATE. The UDS returned from
Get method has the type core.SharedState and cannot directly be used as
Counter. Therefore, it has to be cast to *Counter.
Since the state can be any type satisfying core.SharedState, a UDS can
potentially have any information such as machine learning models,
dictionaries for natural language processing, or even an in-memory database.
Note
As UDFs are concurrently called from multiple goroutines, UDSs also needs to be thread-safe.
3.2.4.4. Saving and Loading a UDS¶
Counter implemented so far doesn’t support saving and loading its state.
Thus, its count will be reset every time the server restarts. To save the
state and load it later on, the UDS and its creator need to provide some
methods. After providing those method, the state can be saved by the
SAVE STATE statement and loaded by LOAD STATE statement.
Supporting SAVE STATE¶
By adding Save method having the following signature to a UDS, the UDS can
be saved by the SAVE STATE statement:
Save(ctx *core.Context, w io.Writer, params data.Map) error
Save method writes all the data that the state has to w io.Writer.
The data can be written in any format as long as corresponding loading methods
can reconstruct the state from it. It can be in JSON, msgpack, Protocol Buffer,
and so on.
Warning
Providing forward/backward compatibility or version controlling of the saved data is the responsibility of the author of the UDS.
*core.Context has the processing context information. params argument
is not used at the moment and reserved for the future use.
Once Save method is provided, the UDS can be saved by SAVE STATE statement:
SAVE STATE my_counter_instance;
The SAVE STATE doesn’t take any parameters now. The location and the
physical format of the saved UDS data depend on the configuration of the
SensorBee server or program running BQL statements. However, it is guaranteed
that the saved data can be loaded by the same program via the LOAD STATE
statement, which is described later.
Save method of previously implemented Counter can be as follows:
func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}
Note
Because this counter is very simple, there’s no version controlling logic in the method. As the minimum solution, having a version number at the beginning of the data is sufficient for most cases.
Supporting LOAD STATE¶
To support the LOAD STATE statement, a UDSCreator needs to have
LoadState method having the following signature:
LoadState(ctx *core.Context, r io.Reader, params data.Map) (core.SharedState, error)
Note
LoadState method needs to be defined in a UDSCreator, not in the
UDS itself.
LoadState method reads data from r io.Reader. The data has exactly the
same format as the one previously written by Save method of a UDS.
params has parameters specified in the SET clause in the LOAD STATE
statement.
Note
Parameters specified in the SET clause doesn’t have to be same as ones
given in the WITH clause of the CREATE STATE statement. See
LOAD STATE for details.
When LoadState method returns an error, the LOAD STATE statement with
CREATE IF NOT STATE doesn’t fallback to CREATE STATE, but it just fails.
Once LoadState method is added to the UDSCreator, the saved state can be
loaded by LOAD STATE statement.
LoadState method of previously implemented CounterCreator can be as
follows:
func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
return nil, err
}
return cnt, nil
}
Providing Load method in a UDS¶
In addition to implementing LoadState method in a UDS’s creator, a UDS
itself can provide Load method. While LoadState method creates a new
state instance and replace it with the previous instance, Load method
dynamically modifies the existing instance. Therefore, Load method can
potentially be more efficient than LoadState method although it has to
provide appropriate failure handling and concurrency control so that (1) the
UDS doesn’t become invalid on failure (i.e. Load methods is “exception
safe”) or by concurrent calls, and (2) other operations on the UDS don’t block
for a long time.
The signature of Load method is almost the same as LoadState method
except that Load method doesn’t return a new core.SharedState but
updates the UDS itself instead:
Load(ctx *Context, r io.Reader, params data.Map) error
Load method of previously implemented Counter can be as follows:
func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
var cnt int64
if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
return err
}
atomic.StoreInt64(&c.c, cnt)
return nil
}
How Loading is Processed¶
SensorBee tries to use these two loading methods LoadState and Load
in the following rule:
When a UDS’s creator doesn’t provide
LoadStatemethod, theLOAD STATEstatement fails.- The
LOAD STATEstatement fails even if the UDS implements itsLoadmethod. To support the statement,LoadStatemethod is always required in its creator. This is becauseLoadmethod only works when an instance of the UDS is already created or loaded, and it cannot be used for a nonexistent instance. - The
LOAD STATE CREATE IF NOT SAVEDstatement also fails ifLoadStatemethod isn’t provided. The statement callsCreateStatemethod when the state hasn’t previously been saved. Otherwise, it’ll try to load the saved data. Therefore, if the data is previously saved and an instance of the UDS hasn’t been created yet, the statement cannot create a new instance withoutLoadStatemethod in the creator. To be consistent on various conditions, theLOAD STATE CREATE IF NOT SAVEDstatement fails ifLoadStatemethod isn’t provided regardless of whether the state has been saved before or not.
- The
When a UDS’s creator provides
LoadStatemethod and the UDS doesn’t provideLoadmethod, theLOAD STATEstatement tries to load a model throughLoadStatemethod.- It will create a new instance so that it consumes twice as much memory.
When a UDS’s creator provides
LoadStatemethod and the UDS also providesLoadmethod,Loadmethod will be used when the instance has already been created or loaded.LoadStatemethod wouldn’t be used even ifLoadmethod failed.
LoadStatemethod will be used otherwise.
Note
This is already mentioned in the list above, but LoadState method always
needs to be provided even if a UDS implements Load method.
3.2.4.5. A Complete Example¶
A complete example of the state is shown in this subsection. Assume that the
import path of the example repository is
github.com/sensorbee/examples/counter, which doesn’t actually exist. The
repository has two files:
- counter.go
- plugin/plugin.go
counter.go¶
package counter
import (
"encoding/binary"
"fmt"
"io"
"sync/atomic"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Counter struct {
c int64
}
func (c *Counter) Terminate(ctx *core.Context) error {
return nil
}
func (c *Counter) Next() int64 {
return atomic.AddInt64(&c.c, 1)
}
func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}
func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
var cnt int64
if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
return err
}
atomic.StoreInt64(&c.c, cnt)
return nil
}
type CounterCreator struct {
}
func (c *CounterCreator) CreateState(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
return nil, err
}
return cnt, nil
}
func Next(ctx *core.Context, uds string) (int64, error) {
s, err := ctx.SharedStates.Get(uds)
if err != nil {
return 0, err
}
c, ok := s.(*Counter)
if !ok {
return 0, fmt.Errorf("the state isn't a counter: %v", uds)
}
return c.Next(), nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/sensorbee/examples/counter"
)
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter",
&counter.CounterCreator{})
udf.MustRegisterGlobalUDF("my_next_count",
udf.MustConvertGeneric(counter.Next))
}
3.2.4.6. Writing Tuples to a UDS¶
When a UDS implements core.Writer, the INSERT INTO statement can
insert tuples into the UDS via the uds sink:
type Writer interface {
Write(*Context, *Tuple) error
}
The following is the example of using the uds sink:
CREATE STATE my_state TYPE my_state_type;
CREATE SINK my_state_sink TYPE uds WITH name = "my_state";
INSERT INTO my_state_sink FROM some_stream;
If my_state_type doesn’t implement core.Writer, the CREATE SINK
statement fails. Every time some_stream emits a tuple, the Write
method of my_state is called.
Example¶
Models provided by Jubatus machine learning plugin for SensorBee implement
the Write method. When tuples are inserted into a UDS, it trains the model
it has.
3.2.5. Source Plugins¶
This section describes how to implement a source as a plugin of SensorBee.
3.2.5.1. Implementing a Source¶
A struct implementing the following interface can be a source:
type Source interface {
GenerateStream(ctx *Context, w Writer) error
Stop(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core package.
The GenerateStream methods actually generate tuples for subsequent streams.
The argument ctx contains the information of the current processing context.
w is the destination to where generated tuples are emitted. The Stop
method stops GenerateStream. It should wait until the GenerateStream
method call returns, but it isn’t mandatory.
Once the GenerateStream method is called, a source can emit as many tuples
as many tuples as it requires. A source basically needs to return from its
GenerateStream method when:
- it emitted all the tuples it has
- the
Stopmethod was called - a fatal error occurred
The Stop method can be called concurrently while the GenerateStream
method is working and it must be thread-safe. As long as a source is used by
components defined in SensorBee, it’s guaranteed that its Stop method is
called only once and it doesn’t have to be idempotent. However, it is
recommended that a source provide a termination check in its Stop method
to avoid a double free problem.
A typical implementation of a source is shown below:
func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
<initialization>
defer func() {
<clean up>
}()
for <check stop> {
t := <create a new tuple>
if err := w.Write(ctx, t); err != nil {
return err
}
}
return nil
}
func (s *MySource) Stop(ctx *core.Context) error {
<turn on a stop flag>
<wait until GenerateStream stops>
return nil
}
The following example source emits tuple periodically:
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
if atomic.LoadInt32(&t.stopped) != 0 {
break
}
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
The interval field is initialized in SourceCreator, which is described
later. This is the source version of the example in UDSF’s section. This
implementation is a little wrong since the Stop method doesn’t wait until
the GenerateStream method actually returns. Because implementing a
thread-safe source which stops correctly is a difficult task, core package
provides a utility function that implements a source’s Stop method on behalf
of the source itself. See Utilities for
details.
3.2.5.2. Registering a Source¶
To register a source to the SensorBee server, the source needs to provide its
SourceCreator. The SourceCreator interface is defined in
gopkg.in/sensorbee/sensorbee.v0/bql pacakge as follows:
type SourceCreator interface {
CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error)
}
It only has one method: CreateSource. The CreateSource method is called
when the CREATE SOURCE statement is executed.
The ctx argument contains the information of the current processing context.
ioParams has the name and the type name of the source, which are given in
the CREATE SOURCE statement. params has parameters specified in the
WITH clause of the CREATE SOURCE statement.
The creator can be registered by RegisterGlobalSourceCreator or
MustRegisterGlobalSourceCreator function. As an example, the creator of
Ticker above can be implemented and registered as follows:
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return &Ticker{
interval: interval,
}, nil
}
func init() {
bql.MustRegisterGlobalSourceCreator("ticker", &TickerCreator{})
}
In this example, the source has a parameter interval which can be specified
in the WITH clause of the CREATE SOURCE statement:
CREATE SOURCE my_ticker TYPE ticker WITH interval = 0.1;
my_ticker emits tuples that look like {"tick": 123} in every 100ms.
Without the interval parameter, my_ticker will emit tuples in every one
second by default.
3.2.5.3. Types of a Source¶
In addition to a regular source, there’re two more types of sources: a resumable source and a rewindable source. This subsection describes those sources in detail.
Resumable Sources¶
A source that supports PAUSE SOURCE and the RESUME SOURCE statements are called a resumable source.
Although all sources support them by default, which is done by the core
package, a source can explicitly implement core.Resumable interface so that
it can provide more efficient pause and resume capability:
type Resumable interface {
Pause(ctx *Context) error
Resume(ctx *Context) error
}
The Pause method is called when PAUSE SOURCE statement is executed and
the Resume method is called by RESUME SOURCE. The Pause method may
be called even when the source is already paused, so is the Resume method.
A source can be non-resumable by implementing these method to return an error:
type MyNonResumableSource struct {
...
}
...
func (m *MyNonResumableSource) Pause(ctx *core.Context) error {
return errors.New("my_non_resumable_source doesn't support pause")
}
func (m *MyNonResumableSource) Resume(ctx *core.Context) error {
return errors.New("my_non_resumable_source doesn't support resume")
}
Rewindable Sources¶
A rewindable source can re-generate the same tuples again from the beginning after it emits all tuples or while it’s emitting tuples. A rewindable source supports the REWIND SOURCE statement.
A source can become rewindable by implementing the core.RewindableSource
interface:
type RewindableSource interface {
Source
Resumable
Rewind(ctx *Context) error
}
A rewindable source also needs to implement core.Resumable to be rewindable.
Note
The reason that a rewindable source also needs to be resumable is due to
the internal implementation of the default pause/resume support. While a
source is paused, it blocks core.Writer.Write called in the
GenerateStream method. The Rewind method could also be blocked while
the Write call is being blocked until the Resume method is called.
It, of course, depends on the implementation of a source, but it’s very
error-prone. Therefore, implementing the Resumable interface is required
to be rewindable at the moment.
Unlike a regular source, the GenerateStream method of a rewindable source
must not return after it emits all tuples. Instead, it needs to wait until the
Rewind method or the Stop method is called. Once it returns, the source
is considered stopped and no further operation including the REWIND SOURCE
statement wouldn’t work on the source.
Due to its nature, a stream isn’t often resumable. A resumable source is
mostly used for relatively static data sources such as relations or files.
Also, because implementing the RewindableSource interface is even harder
than implementing the Resumable interface, utilities are usually used.
3.2.5.4. Utilities¶
There’re some utilities to support implementing sources and its creators. This subsection describes each utility.
core.ImplementSourceStop¶
core.ImplementSourceStop is a function that implements the Stop method
of a source in a thread-safe manner:
func ImplementSourceStop(s Source) Source
A source returned from this function is resumable, but not rewindable even if
the original source implements the core.RewindableSource interface. In
addition, although a source passed to core.ImplementSourceStop can
explicitly implement the core.Resumable interface, its Pause and
Resume method will never be called because the source returned from
core.ImplementSourceStop also implements those methods and controls
pause and resume.
To apply this function, a source must satisfy following restrictions:
The
GenerateStreammethod must be implemented in a way that it can safely be called again after it returns.The
GenerateStreammethod must return when thecore.Writer.Writereturnedcore.ErrSourceStopped. The method must return exactly the same error variable that the writer returned.The
Stopmethod just returns nil.- This means all resource allocation and deallocation must be done within
the
GenerateStreammethod.
- This means all resource allocation and deallocation must be done within
the
A typical implementation of a source passed to core.ImplementSourceStop is
shown below:
func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
<initialization>
defer func() {
<clean up>
}()
for {
t := <create a new tuple>
if err := w.Write(ctx, t); err != nil {
return err
}
}
return nil
}
func (s *MySource) Stop(ctx *core.Context) error {
return nil
}
If a source wants to ignore errors returned from core.Writer.Write other
than core.ErrSourceStopped, the GenerateStream method can be modified
as:
if err := w.Write(ctx, t); err != nil {
if err == core.ErrSourceStopped {
return err
}
}
By applying core.ImplementSourceStop, the Ticker above can be
implemented as follows:
type Ticker struct {
interval time.Duration
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
return nil
}
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&Ticker{
interval: interval,
}), nil
}
There’s no stopped flag now. In this version, the Stop method of the
source returned by core.ImplementSourceStop waits until the
GenerateStream method returns.
core.NewRewindableSource¶
core.NewRewindableSource is a function that converts a regular source into
a rewindable source:
func NewRewindableSource(s Source) RewindableSource
A source returned from this function is resumable and rewindable. A source
passed to the function needs to satisfy the same restrictions as
core.ImplementSourceStop. In addition to that, there’s one more restriction
for core.NewRewindableSource:
- The
GenerateStreammethod must return when thecore.Writer.Writereturnedcore.ErrSourceRewound. The method must return exactly the same error variable that the writer returned.
Although the GenerateStream method of a rewindable source must not return
after it emits all tuples, a source passed to the core.NewRewindableSource
function needs to return in that situation. For example, let’s assume there’s a
source that generate tuples from each line in a file. To implement the source
without a help of the utility function, its GenerateStream must wait for
the Rewind method to be called after it processes all lines in the file.
However, with the utility, its GenerateStream can just return once it emits
all tuples. Therefore, a typical implementation of a source passed to the
utility can be same as a source for core.ImplementSourceStop.
As it will be shown later, a source that infinitely emits tuples can also be rewindable in some sense.
The following is an example of TickerCreator modified from the example for
core.ImplementSourceStop:
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
rewindable := false
if v, ok := params["rewindable"]; ok {
b, err := data.AsBool(v)
if err != nil {
return nil, err
}
rewindable = b
}
src := &Ticker{
interval: interval,
}
if rewindable {
return core.NewRewindableSource(src), nil
}
return core.ImplementSourceStop(src), nil
}
In this example, Ticker has the rewindable parameter. If it is true,
the source becomes rewindable:
CREATE SOURCE my_rewindable_ticker TYPE ticker WITH rewindable = true;
By issuing the REWIND SOURCE statement, my_rewindable_ticker resets
the value of tick field:
REWIND SOURCE my_rewindable_ticker;
-- output examples of SELECT RSTREAM * FROM my_rewindable_ticker [RANGE 1 TUPLES];
{"tick":0}
{"tick":1}
{"tick":2}
...
{"tick":123}
-- REWIND SOURCE is executed here
{"tick":0}
{"tick":1}
...
bql.SourceCreatorFunc¶
bql.SourceCreatorFunc is a function that converts a function having the
same signature as SourceCreator.CreateSource to a SourceCreator:
func SourceCreatorFunc(f func(*core.Context,
*IOParams, data.Map) (core.Source, error)) SourceCreator
For example, TickerCreator above and its registration can be modified to as
follows with this utility:
func CreateTicker(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&Ticker{
interval: interval,
}), nil
}
func init() {
bql.MustRegisterGlobalSourceCreator("ticker",
bql.SourceCreatorFunc(CreateTicker))
}
3.2.5.5. A Complete Example¶
A complete example of Ticker is shown in this subsection. Assume that the
import path of the example is github.com/sensorbee/examples/ticker, which
doesn’t actually exist. There’re two files in the repository:
- ticker.go
- plugin/plugin.go
The example uses core.NewRewindableSource utility function.
ticker.go¶
package ticker
import (
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Ticker struct {
interval time.Duration
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
// This method will be implemented by utility functions.
return nil
}
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
rewindable := false
if v, ok := params["rewindable"]; ok {
b, err := data.AsBool(v)
if err != nil {
return nil, err
}
rewindable = b
}
src := &Ticker{
interval: interval,
}
if rewindable {
return core.NewRewindableSource(src), nil
}
return core.ImplementSourceStop(src), nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"github.com/sensorbee/examples/ticker"
)
func init() {
bql.MustRegisterGlobalSourceCreator("ticker", &ticker.TickerCreator{})
}
3.2.6. Sink Plugins¶
This section describes how to implement a sink as a plugin of SensorBee.
3.2.6.1. Implementing a Sink¶
A struct implementing the following interface can be a sink:
type Sink interface {
Write(ctx *Context, t *Tuple) error
Close(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core package.
The Write method write a tuple to a destination of the sink. The argument
ctx contains the information of the current processing context. t is the
tuple to be written. The Close method is called when the sink becomes
unnecessary. It must release all resources allocated for the sink.
The following example sink write a tuple as a JSON to stdout:
type StdoutSink struct {
}
func (s *StdoutSink) Write(ctx *core.Context, t *core.Tuple) error {
_, err := fmt.Fprintln(os.Stdout, t.Data)
return err
}
func (s *StdoutSink) Close(ctx *core.Context) error {
// nothing to release
return nil
}
A sink is initialized by its SinkCreator, which is described later.
Note
SensorBee doesn’t provide buffering or retry capability for sinks.
3.2.6.2. Registering a Sink¶
To register a sink to the SensorBee server, the sink needs to provide its
SinkCreator. The SinkCreator interface is defined in
gopkg.in/sensorbee/sensorbee.v0/bql pacakge as follows:
// SinkCreator is an interface which creates instances of a Sink.
type SinkCreator interface {
// CreateSink creates a new Sink instance using given parameters.
CreateSink(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Sink, error)
}
It only has one method: CreateSink. The CreateSink method is called
when the CREATE SINK statement is executed.
The ctx argument contains the information of the current processing context.
ioParams has the name and the type name of the sink, which are given in
the CREATE SINK statement. params has parameters specified in the
WITH clause of the CREATE SINK statement.
The creator can be registered by RegisterGlobalSinkCreator or
MustRegisterGlobalSinkCreator function. As an example, the creator of
StdoutSink above can be implemented and registered as follows:
type StdoutSinkCreator struct {
}
func (s *StdoutSinkCreator) CreateSink(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &StdoutSink{}, nil
}
func init() {
bql.MustRegisterGlobalSinkCreator("my_stdout", &StdoutSinkCreator{})
}
This sink doesn’t have parameters specified in the WITH clause of the
CREATE SINK statement. How to handle parameters for sink is same as how
source does. See Source Plugins for more details.
3.2.6.3. Utilities¶
There’s one utility function for sink plugins: SinkCreatorFunc:
func SinkCreatorFunc(f func(*core.Context,
*IOParams, data.Map) (core.Sink, error)) SinkCreator
This utility function is defined in gopkg.in/sensorbee/sensorbee.v0/bql
pacakge. It converts a function having the same signature as
SinkCreator.CreateSink to a SinkCreator. With this utility, for example,
StdoutSinkCreator can be modified to:
func CreateStdoutSink(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &StdoutSink{}, nil
}
fucn init() {
bql.MustRegisterGlobalSinkCreator("stdout",
bql.SinkCreatorFunc(CreateStdoutSink))
}
3.2.6.4. A Complete Example¶
A complete example of the sink is shown in this subsection. The package name for
the sink is stdout and StdoutSink is renamed to Sink. Also, this
example uses SinkCreatorFunc utility for SinkCreator.
Assume that the import path of the example is
github.com/sensorbee/examples/stdout, which doesn’t actually exist. The
repository has to files:
- stdout.go
- plugin/plugin.go
stdout.go¶
package stdout
import (
"fmt"
"os"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Sink struct {
}
func (s *Sink) Write(ctx *core.Context, t *core.Tuple) error {
_, err := fmt.Fprintln(os.Stdout, t.Data)
return err
}
func (s *Sink) Close(ctx *core.Context) error {
return nil
}
func Create(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &Sink{}, nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"github.com/sensorbee/examples/stdout"
)
func init() {
bql.MustRegisterGlobalSinkCreator("my_stdout",
bql.SinkCreatorFunc(stdout.Create))
}