3. Server Programming

This part describes the extensibility of the SensorBee server. Topics covered in this part are advanced and should be read after understanding the basics of SensorBee and BQL.

Because SensorBee is mainly written in The Go Programming Language, understanding the language before reading this part is also recommended. A Tour of Go is an official tutorial and is one of the best tutorials of the language. It runs on web browsers and does not require any additional software installation. After learning the language, How to Write Go Code helps to understand how to use the go tool and the standard way to develop Go packages and applications.

This part assumes that the go tool is installed and the development environment including Go’s environment variables like GOROOT or GOPATH is appropriately set up. SensorBee requires Go 1.4 or later.

3.1. Extending the SensorBee Server and BQL

Many features of the server and BQL can be extended by plugins. This chapter describes what types of features can be extended by users. Following chapters in this part describes how to develop those features in specific programming languages.

3.1.1. User-Defined Functions

A user-defined function (UDF) is a function that is implemented by a user and registered in the SensorBee server. Once it is registered, it be called from BQL statements:

SELECT RSTREAM my_udf(field) FROM src [RANGE 1 TUPLES];

A UDF behaves just like a built-in function. A UDF can take an arbitrary number of arguments. Each argument can be any of built-in types and can receive multiple types of values. A UDF can also support a variable-length argument. A UDF has a single return value of any built-in type. When multiple return values are required, a UDF can return the value as an array.

Note

BQL currently does not support CREATE FUNCTION statements like well-known RDBMSs. UDFs can only be added through Go programs.

3.1.2. User-Defined Aggregate Functions

A user-defined aggregate function (UDAF) is a function similar to a UDF but can take aggregation parameters (see Aggregate Expressions) as arguments in addition to regular arguments.

3.1.3. User-Defined Stream-Generating Functions

Stream-generating functions can also be user-defined. There are two types of UDSFs. The first type behaves like a source, which is not connected to any input stream and generates tuples proactively:

... FROM my_counter() [RANGE ...

my_counter above may emit tuples like {"count": 1}.

This type of UDSFs are called source-like UDSFs.

The other type is called a stream-like UDSF and behaves just like a stream, which receives tuples from one or more incoming streams or sources. It receives names of streams or sources as its arguments:

... FROM my_udsf("another_stream", "yet_another_stream", other_params) [RANGE ...

Note that there is no rule on how to define UDFS’s arguments. Thus, the order and the use of arguments depend on each UDFS. For example, a UDFS might take an array of string containing names of input streams as its first argument: my_union(["stream1", "stream2", "stream3"]). Names of input stream do not even need to be located at the beginning of the argument list: my_udfs2(1, "another_stream") is also possible.

Using UDSFs is a very powerful way to extend BQL since they can potentially do anything that the SELECT cannot do.

3.1.4. User-Defined States

A user-defined state (UDS) can be provided to support stateful data processing (see Stateful Data Processing). A UDS is usually provided with a set of UDFs that manipulate the state. Those UDFs take the name of the UDS as a string argument:

CREATE STATE event_id_seq TYPE snowflake_id WITH machine_id = 1;
CREATE STREAM events_with_id AS
    SELECT snowflake_id("event_id_seq"), * FROM events [RANGE 1 TUPLES];

In the example above, a UDS event_id_seq is created with the type snowflake_id. Then, the UDS is passed to the UDF snowflake_id, which happens to have the same name as the type name of the UDS. The UDF looks up the UDS event_id_seq and returns a value computed based on the state.

3.1.5. Source Plugins

A source type developed by a user can be added to the SensorBee server as a plugin so that it can be used in CREATE SOURCE statements. A source type can have any number of required and optional parameters. Each parameter can have any of built-in types.

3.1.6. Sink Plugins

A sink type developed by a user can be added to the SensorBee server as a plugin so that it can be used in CREATE SINK statement. A sink type can have any number of required and optional parameters. Each parameter can have any of built-in types.

3.2. Extensions in Go

This chapter describes how to extend the SensorBee server in the Go programming language.

3.2.1. Development Flow of Components in Go

The typical development flow of components like a UDF, a UDS type, a source type, or a sink type should be discussed before looking into details of each component.

The basic flow is as follows:

  1. Create a git repository for components
  2. Implement components
  3. Create a plugin subpackage in the repository

3.2.1.1. Create a Git Repository for Components

Components are written in Go, so they need to be in a valid git repository (or a repository of a different version control system). One repository may provide multiple types of components. For example, a repository could have 10 UDFs, and 5 UDS types, 2 source types, and 1 sink type. However, since Go is very well designed to provide packages in a fine-grained manner, each repository should only provide a minimum set of components that are logically related and make sense to be in the same repository.

3.2.1.2. Implement Components

The next step is to implement components. There is no restriction on which standard or 3rd party packages to depend on.

Functions or structs that are to be registered to the SensorBee server need to be referred to by the plugin subpackage, which is described in the next subsection. Thus, names of those symbols need to start with a capital letter.

In this step, components should not be registered to the SensorBee server yet.

3.2.1.3. Create a Plugin Subpackage in the Repository

It is highly recommended that the repository has a separate package (i.e. a subdirectory) which only registers components to the SensorBee server. There is usually one file named “plugin.go” in the plugin package and it only contains a series of registration function calls in init function. For instance, if the repository only provides one UDF, the contents of plugin.go would look like:

// in github.com/user/myudf/plugin/plugin.go
package plugin

import (
    "gopkg.in/sensorbee/sensorbee.v0/bql/udf"
    "github.com/user/myudf"
)

func init() {
    udf.MustRegisterGlobalUDF("my_udf", &myudf.MyUDF{})
}

There are two reasons to have a plugin subpackage separated from the implementation of components. First, by separating them, other Go packages can import the components to use the package as a library without registering them to SensorBee. Second, having a separated plugin package allows a user to register a component with a different name. This is especially useful when names of components conflict each other.

To use the example plugin above, the github.com/user/myudf/plugin package needs to be added to the plugin path list of SensorBee.

3.2.1.4. Repository Organization

The typical organization of the repository is

  • github.com/user/repo

    • README: description and the usage of components in the repository

    • .go files: implementation of components

    • plugin/: a subpackage for the plugin registration

      • plugin.go
    • othersubpackages/: there can be optional subpackages

3.2.2. User-Defined Functions

This section describes how to write a UDF in Go. It first shows the basic interface of defining UDFs, and then describes utilities around it, how to develop a UDF in a Go-ish manner, and a complete example.

3.2.2.1. Implementing a UDF

Note

This is a very low-level way to implement a UDF in Go. To learn about an easier way, see Generic UDFs.

Any struct implementing the following interface can be used as a UDF:

type UDF interface {
    // Call calls the UDF.
    Call(*core.Context, ...data.Value) (data.Value, error)

    // Accept checks if the function accepts the given number of arguments
    // excluding core.Context.
    Accept(arity int) bool

    // IsAggregationParameter returns true if the k-th parameter expects
    // aggregated values. A UDF with Accept(n) == true is an aggregate
    // function if and only if this function returns true for one or more
    // values of k in the range 0, ..., n-1.
    IsAggregationParameter(k int) bool
}

This interface is defined in the gopkg.in/sensorbee/sensorbee.v0/bql/udf package.

A UDF can be registered via the RegisterGlobalUDF or MustRegisterGlobalUDF functions from the same package. MustRegisterGlobalUDF is the same as RegisterGlobalUDF but panics on failure instead of returning an error. These functions are usually called from the init function in the UDF package’s plugin subpackage. A typical implementation of a UDF looks as follows:

type MyUDF struct {
    ...
}

func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
    ...
}

func (m *MyUDF) Accept(arity int) bool {
    ...
}

func (m *MyUDF) IsAggregationParameter(k int) bool {
    ...
}

func init() {
    // MyUDF can be used as my_udf in BQL statements.
    udf.MustRegisterGlobalUDF("my_udf", &MyUDF{})
}

As it can be inferred from this example, a UDF itself should be stateless since it only registers one instance of a struct as a UDF and it will be shared globally. Stateful data processing can be achieved by the combination of UDFs and UDSs, which is described in User-Defined States.

A UDF needs to implement three methods to satisfy udf.UDF interface: Call, Accept, and IsAggregationParameter.

The Call method receives a *core.Context and multiple data.Value as its arguments. *core.Context contains the information of the current processing context. Call‘s ...data.Value argument holds the values passed to the UDF. data.Value represents a value used in BQL and can be any of built-in types.

SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];

In this example, arg1 and arg2 are passed to the Call method:

func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
    // When my_udf(arg1, arg2) is called, len(args) is 2.
    // args[0] is arg1 and args[1] is arg2.
    // It is guaranteed that m.Accept(len(args)) is always true.
}

Because data.Value is a semi-variant type, the Call method needs to check the type of each data.Value and convert it to a desired type.

The Accept method verifies if the UDF accepts the specific number of arguments. It can return true for multiple arities as long as it can receive the given number of arguments. If a UDF only accepts two arguments, the method is implemented as follows:

func (m *MyUDF) Accept(arity int) bool {
    return arity == 2
}

When a UDF aims to support variadic parameters (a.k.a. variable-length arguments) with two required arguments (e.g. my_udf(arg1, arg2, optional1, optional2, ...)), the implementation would be:

func (m *MyUDF) Accept(arity int) bool {
    return arity >= 2
}

Finally, IsAggregationParameter returns whether the k-th argument (starting from 0) is an aggregation parameter. Aggregation parameters are passed as a data.Array containing all values of a field in each group.

All of these methods can be called concurrently from multiple goroutines and they must be thread-safe.

The registered UDF is looked up based on its name and the number of argument passed to it.

SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];

In this SELECT, a UDF having the name my_udf is looked up first. After that, its Accept method is called with 2 and my_udf is actually selected if Accept(2) returned true. IsAggregationParameter method is additionally called on each argument to see if the argument needs to be an aggregation parameter. Then, if there is no mismatch, my_udf is finally called.

Note

A UDF does not have a schema at the moment, so any error regarding types of arguments will not be reported until the statement calling the UDF actually processes a tuple.

3.2.2.2. Generic UDFs

SensorBee provides a helper function to register a regular Go function as a UDF without implementing the UDF interface explicitly.

func Inc(v int) int {
    return v + 1
}

This function Inc can be transformed into a UDF by ConvertGeneric or MustConvertGeneric function defined in the gopkg.in/sensorbee/sensorbee.v0/bql/udf package. By combining it with RegisterGlobalUDF, the Inc function can easily be registered as a UDF:

func init() {
    udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}

So, a complete example of the UDF implementation and registration is as follows:

package inc

import (
    "gopkg.in/sensorbee/sensorbee.v0/bql/udf"
)

func Inc(v int) int {
    return v + 1
}

func init() {
    udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}

Note

A UDF implementation and registration should actually be separated to different packages. See Development Flow of Components in Go for details.

Although this approach is handy, there is some small overhead compared to a UDF implemented in the regular way. Most of such overhead comes from type checking and conversions.

Functions passed to ConvertGeneric need to satisfy some restrictions on the form of their argument and return value types. Each restriction is described in the following subsections.

Form of Arguments

In terms of valid argument forms, there are some rules to follow:

  1. A Go function can receive *core.Context as the first argument, or can omit it.
  2. A function can have any number of arguments including 0 arguments as long as Go accepts them.
  3. A function can be variadic with or without non-variadic parameters.

There are basically eight (four times two, whether a function has *core.Context or not) forms of arguments (return values are intentionally omitted for clarity):

  • Functions receiving no argument in BQL (e.g. my_udf())

    1. func(*core.Context): A function only receiving *core.Context
    2. func(): A function having no argument and not receiving *core.Context, either
  • Functions having non-variadic arguments but no variadic arguments

    1. func(*core.Context, T1, T2, ..., Tn)
    2. func(T1, T2, ..., Tn)
  • Functions having variadic arguments but no non-variadic arguments

    1. func(*core.Context, ...T)
    2. func(...T)
  • Functions having both variadic and non-variadic arguments

    1. func(*core.Context, T1, T2, ..., Tn, ...Tn+1)
    2. func(T1, T2, ..., Tn, ...Tn+1)

Here are some examples of invalid function signatures:

  • func(T, *core.Context): *core.Context must be the first argument.
  • func(NonSupportedType): Only supported types, which will be explained later, can be used.

Although return values are omitted from all the examples above, they are actually required. The next subsection explains how to define valid return values.

Form of Return Values

All functions need to have return values. There are two forms of return values:

  • func(...) R
  • func(...) (R, error)

All other forms are invalid:

  • func(...)
  • func(...) error
  • func(...) NonSupportedType

Valid types of return values are same as the valid types of arguments, and they are listed in the following subsection.

Valid Value Types

The list of Go types that can be used for arguments and the return value is as follows:

  • bool
  • signed integers: int, int8, int16, int32, int64
  • unsigned integers: uint, uint8, uint16, uint32, uint64
  • float32, float64
  • string
  • time.Time
  • data: data.Bool, data.Int, data.Float, data.String, data.Blob, data.Timestamp, data.Array, data.Map, data.Value
  • A slice of any type above, including data.Value

data.Value can be used as a semi-variant type, which will receive all types above.

When the argument type and the actual value type are different, weak type conversion are applied to values. Conversions are basically done by data.ToXXX functions (see godoc comments of each function in data/type_conversions.go). For example, func inc(i int) int can be called by inc("3") in a BQL statement and it will return 4. If a strict type checking or custom type conversion is required, receive values as data.Value and manually check or convert types, or define the UDF in the regular way.

Examples of Valid Go Functions

The following functions can be converted to UDFs by ConvertGeneric or MustConvertGeneric function:

  • func rand() int
  • func pow(*core.Context, float32, float32) (float32, error)
  • func join(*core.Context, ...string) string
  • func format(string, ...data.Value) (string, error)
  • func keys(data.Map) []string

3.2.2.3. Complete Examples

This subsection shows three example UDFs:

  • my_inc
  • my_join
  • my_join2

Assume that these are in the repository github.com/sensorbee/examples/udfs (which actually does not exist). The repository has three files:

  • inc.go
  • join.go
  • plugin/plugin.go
inc.go

In inc.go, the Inc function is defined as a pure Go function with a standard value type:

package udfs

func Inc(v int) int {
    return v + 1
}
join.go

In join.go, the Join UDF is defined in a strict way. It also performs strict type checking. It is designed to be called in one of two forms: my_join("a", "b", "c", "separator") or my_join(["a", "b", "c"], "separator"). Each argument and value in the array must be a string. The UDF receives an arbitrary number of arguments.

package udfs

import (
    "errors"
    "strings"

    "pfi/sensorbee/sensorbee/core"
    "pfi/sensorbee/sensorbee/data"
)

type Join struct {
}

func (j *Join) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
    empty := data.String("")
    if len(args) == 1 {
        return empty, nil
    }

    switch args[0].Type() {
    case data.TypeString: // my_join("a", "b", "c", "sep") form
        var ss []string
        for _, v := range args {
            s, err := data.AsString(v)
            if err != nil {
                return empty, err
            }
            ss = append(ss, s)
        }
        return data.String(strings.Join(ss[:len(ss)-1], ss[len(ss)-1])), nil

    case data.TypeArray: // my_join(["a", "b", "c"], "sep") form
        if len(args) != 2 {
            return empty, errors.New("wrong number of arguments for my_join(array, sep)")
        }
        sep, err := data.AsString(args[1])
        if err != nil {
            return empty, err
        }

        a, _ := data.AsArray(args[0])
        var ss []string
        for _, v := range a {
            s, err := data.AsString(v)
            if err != nil {
                return empty, err
            }
            ss = append(ss, s)
        }
        return data.String(strings.Join(ss, sep)), nil

    default:
        return empty, errors.New("the first argument must be a string or an array")
    }
}

func (j *Join) Accept(arity int) bool {
    return arity >= 1
}

func (j *Join) IsAggregationParameter(k int) bool {
    return false
}
plugin/plugin.go

In addition to Inc and Join, this file registers the standard Go function strings.Join as my_join2. Because it’s converted to a UDF by udf.MustConvertGeneric, arguments are weakly converted to given types. For example, my_join2([1, 2.3, "4"], "-") is valid although strings.Join itself is func([]string, string) string.

package plugin

import (
    "strings"

    "pfi/sensorbee/sensorbee/bql/udf"

    "pfi/nobu/docexamples/udfs"
)

func init() {
    udf.MustRegisterGlobalUDF("my_inc", udf.MustConvertGeneric(udfs.Inc))
    udf.MustRegisterGlobalUDF("my_join", &udfs.Join{})
    udf.MustRegisterGlobalUDF("my_join2", udf.MustConvertGeneric(strings.Join))
}
Evaluating Examples

Once the sensorbee command is built with those UDFs and a topology is created on the server, the EVAL statement can be used to test them:

EVAL my_inc(1); -- => 2
EVAL my_inc(1.5); -- => 2
EVAL my_inc("10"); -- => 11

EVAL my_join("a", "b", "c", "-"); -- => "a-b-c"
EVAL my_join(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join(1, "b", "c", "-") -- => error
EVAL my_join([1, "b", "c"], ",") -- => error

EVAL my_join2(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join2([1, "b", "c"], ",") -- => "1,b,c"

3.2.2.4. Dynamic Loading

Dynamic loading of UDFs written in Go is not supported at the moment because Go does not support loading packages dynamically.

3.2.3. User-Defined Stream-Generating Functions

This section describes how to write a UDSF in Go.

3.2.3.1. Implementing a UDSF

To provide a UDSF, two interfaces need to be implemented: UDSF and UDSFCreator.

The interface UDSF is defined as follows in the gopkg.in/sensorbee/sensorbee.v0/bql/udf package.

type UDSF interface {
    Process(ctx *core.Context, t *core.Tuple, w core.Writer) error
    Terminate(ctx *core.Context) error
}

The Process method processes an input tuple and emits computed tuples for subsequent streams. ctx contains the processing context information. t is the tuple to be processed in the UDSF. w is the destination to where computed tuples are emitted. The Terminate method is called when the UDFS becomes unnecessary. The method has to release all the resources the UDSF has.

How the Process method is called depends on the type of a UDSF. When a UDFS is a stream-like UDSF (i.e. it has input from other streams), the Process method is called every time a new tuple arrives. The argument t contains the tuple emitted from another stream. Stream-like UDSFs have to return from Process immediately after processing the input tuple. They must not block in the method.

A stream-like UDSF is used mostly when multiple tuples need to be computed and emitted based on one input tuple:

type WordSplitter struct {
    field string
}

func (w *WordSplitter) Process(ctx *core.Context, t *core.Tuple, writer core.Writer) error {
    var kwd []string
    if v, ok := t.Data[w.field]; !ok {
        return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
    } else if s, err := data.AsString(v); err != nil {
        return fmt.Errorf("'%v' field must be string: %v", w.field, err)
    } else {
        kwd = strings.Split(s, " ")
    }

    for _, k := range kwd {
        out := t.Copy()
        out.Data[w.field] = data.String(k)
        if err := writer.Write(ctx, out); err != nil {
            return err
        }
    }
    return nil
}

func (w *WordSplitter) Terminate(ctx *core.Context) error {
    return nil
}

WordSplitter splits text in a specific field by space. For example, when an input tuple is {"word": "a b c"} and WordSplitter.Field is word, following three tuples will be emitted: {"word": "a"}, {"word": "b"}, and {"word": "c"}.

When a UDSF is a source-like UDSF, the Process method is only called once with a tuple that does not mean anything. Unlike a stream-like UDSF, the Process method of a source-like UDSF does not have to return until it has emitted all tuples, the Terminate method is called, or a fatal error occurs.

type Ticker struct {
    interval time.Duration
    stopped  int32
}

func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
    var i int64
    for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
        newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
        if err := w.Write(ctx, newTuple); err != nil {
            return err
        }
        time.Sleep(t.interval)
    }
    return nil
}

func (t *Ticker) Terminate(ctx *core.Context) error {
    atomic.StoreInt32(&t.stopped, 1)
    return nil
}

In this example, Ticker emits tuples having tick field containing a counter until the Terminate method is called.

Whether a UDSF is stream-like or source-like can be configured when it is created by UDSFCreator. The interface UDSFCreator is defined as follows in gopkg.in/sensorbee/sensorbee.v0/bql/udf package:

type UDSFCreator interface {
    CreateUDSF(ctx *core.Context, decl UDSFDeclarer, args ...data.Value) (UDSF, error)
    Accept(arity int) bool
}

The CreateUDSF method creates a new instance of a UDSF. The method is called when evaluating a UDSF in the FROM clause of a SELECT statement. ctx contains the processing context information. decl is used to customize the behavior of the UDSF, which is explained later. args has arguments passed in the SELECT statement. The Accept method verifies if the UDSF accept the specific number of arguments. This is the same as UDF.Arity method (see User-Defined Functions).

UDSFDeclarer is used in the CreateUDSF method to customize the behavior of a UDSF:

type UDSFDeclarer interface {
    Input(name string, config *UDSFInputConfig) error
    ListInputs() map[string]*UDSFInputConfig
}

By calling its Input method, a UDSF will be able to receive tuples from another stream with the name name. Because the name is given outside the UDSF, it’s uncontrollable from the UDSF. However, there are cases that a UDSF wants to know from which stream a tuple has come. For example, when providing a UDSF performing a JOIN or two streams, a UDSF needs to distinguish which stream emitted the tuple. If the UDSF was defined as my_join(left_stream, right_stream), decl can be used as follows in UDSFCreator.CreateUDSF:

decl.Input(args[0], &UDSFInputConfig{InputName: "left"})
decl.Input(args[1], &UDSFInputConfig{InputName: "right"})

By configuring the input stream in this way, a tuple passed to UDSF.Process has the given name in its Tuple.InputName field:

func (m *MyJoin) Process(ctx *core.Context, t *core.Tuple, w core.Writer) error {
    switch t.InputName {
    case "left":
        ... process tuples from left_stream ...
    case "right":
        ... process tuples from right_stream ...
    }
    ...
}

If a UDSF is configured to have one or more input streams by decl.Input in the UDSFCreator.CreateUDSF method, the UDSF is processed as a stream-like UDSF. Otherwise, if a UDSF doesn’t have any input (i.e. decl.Input is not called), the UDSF becomes a source-like UDSF.

As an example, the UDSFCreator of WordSplitter is shown below:

type WordSplitterCreator struct {
}

func (w *WordSplitterCreator) CreateUDSF(ctx *core.Context,
    decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
    input, err := data.AsString(args[0])
    if err != nil {
        return nil, fmt.Errorf("input stream name must be a string: %v", args[0])
    }
    field, err := data.AsString(args[1])
    if err != nil {
        return nil, fmt.Errorf("target field name must be a string: %v", args[1])
    }
    // This Input call makes the UDSF a stream-like UDSF.
    if err := decl.Input(input, nil); err != nil {
        return nil, err
    }
    return &WordSplitter{
        field: field,
    }, nil
}

func (w *WordSplitterCreator) Accept(arity int) bool {
    return arity == 2
}

Although the UDSF has not been registered to the SensorBee server yet, it could appear like word_splitter(input_stream_name, target_field_name) if it was registered with the name word_splitter.

For another example, the UDSFCreator of Ticker is shown below:

type TickerCreator struct {
}

func (t *TickerCreator) CreateUDSF(ctx *core.Context,
    decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
    interval, err := data.ToDuration(args[0])
    if err != nil {
        return nil, err
    }
    // Since this is a source-like UDSF, there's no input.
    return &Ticker{
        interval: interval,
    }, nil
}

func (t *TickerCreator) Accept(arity int) bool {
    return arity == 1
}

Like word_splitter, its signature could be ticker(interval) if the UDSF is registered as ticker.

The implementation of this UDSF is completed and the next step is to register it to the SensorBee server.

3.2.3.2. Registering a UDSF

A UDSF can be used in BQL by registering its UDSFCreator interface to the SensorBee server using the RegisterGlobalUDSFCreator or MustRegisterGlobalUDSFCreator functions, which are defined in gopkg.in/sensorbee/sensorbee.v0/bql/udf.

The following example registers WordSplitter and Ticker:

func init() {
    udf.RegisterGlobalUDSFCreator("word_splitter", &WordSplitterCreator{})
    udf.RegisterGlobalUDSFCreator("ticker", &TickerCreator{})
}

3.2.3.3. Generic UDSFs

Just like UDFs have a ConvertGeneric function, UDSFs also have ConvertToUDSFCreator and MustConvertToUDSFCreator function. They convert a regular function satisfying some restrictions to the UDSFCreator interface.

The restrictions are the same as for generic UDFs except that a function converted to the UDSFCreator interface has an additional argument UDSFDeclarer. UDSFDeclarer is located after *core.Context and before other arguments. Examples of valid function signatures are show below:

  • func(*core.Context, UDSFDeclarer, int)
  • func(UDSFDeclarer, string)
  • func(UDSFDeclarer)
  • func(*core.Context, UDSFDeclarer, ...data.Value)
  • func(UDSFDeclarer, ...float64)
  • func(*core.Context, UDSFDeclarer, int, ...string)
  • func(UDSFDeclarer, int, float64, ...time.Time)

Unlike *core.Context, UDSFDeclarer cannot be omitted. The same set of types can be used for arguments as types that ConvertGeneric function accepts.

WordSplitterCreator can be rewritten with the ConvertToUDSFCreator function as follows:

func CreateWordSplitter(decl udf.UDSFDeclarer,
    inputStream, field string) (udf.UDSF, error) {
    if err := decl.Input(inputStream, nil); err != nil {
        return nil, err
    }
    return &WordSplitter{
        field: field,
    }, nil
}

func init() {
    udf.RegisterGlobalUDSFCreator("word_splitter",
        udf.MustConvertToUDSFCreator(WordSplitterCreator))
}

TickerCreator can be replaced with ConvertToUDSFCreator, too:

func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
    interval, err := data.ToDuration(i)
    if err != nil {
        return nil, err
    }
    return &Ticker{
        interval: interval,
    }, nil
}

func init() {
    udf.MustRegisterGlobalUDSFCreator("ticker",
       udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}

3.2.3.4. A Complete Example

This subsection provides a complete example of UDSFs described in this section. In addition to word_splitter and ticker, the example also includes the lorem source, which periodically emits random texts as {"text": "lorem ipsum dolor sit amet"}.

Assume that the import path of the example repository is github.com/sensorbee/examples/udsfs, which doesn’t actually exist. The repository has four files:

  • lorem.go
  • splitter.go
  • ticker.go
  • plugin/plugin.go
lorem.go

To learn how to implement a source plugin, see Source Plugins.

package udsfs

import (
    "math/rand"
    "strings"
    "time"

    "gopkg.in/sensorbee/sensorbee.v0/bql"
    "gopkg.in/sensorbee/sensorbee.v0/core"
    "gopkg.in/sensorbee/sensorbee.v0/data"
)

var (
    Lorem = strings.Split(strings.Replace(`lorem ipsum dolor sit amet
consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore
magna aliqua Ut enim ad minim veniam quis nostrud exercitation ullamco laboris
nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in reprehenderit
in voluptate velit esse cillum dolore eu fugiat nulla pariatur Excepteur sint
occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim
id est laborum`, "\n", " ", -1), " ")
)

type LoremSource struct {
    interval time.Duration
}

func (l *LoremSource) GenerateStream(ctx *core.Context, w core.Writer) error {
    for {
        var text []string
        for l := rand.Intn(5) + 5; l > 0; l-- {
            text = append(text, Lorem[rand.Intn(len(Lorem))])
        }

        t := core.NewTuple(data.Map{
            "text": data.String(strings.Join(text, " ")),
        })
        if err := w.Write(ctx, t); err != nil {
            return err
        }

        time.Sleep(l.interval)
    }
}

func (l *LoremSource) Stop(ctx *core.Context) error {
    return nil
}

func CreateLoremSource(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Source, error) {
    interval := 1 * time.Second
    if v, ok := params["interval"]; ok {
        i, err := data.ToDuration(v)
        if err != nil {
            return nil, err
        }
        interval = i
    }
    return core.ImplementSourceStop(&LoremSource{
        interval: interval,
    }), nil
}
splitter.go
package udsfs

import (
    "fmt"
    "strings"

    "gopkg.in/sensorbee/sensorbee.v0/bql/udf"
    "gopkg.in/sensorbee/sensorbee.v0/core"
    "gopkg.in/sensorbee/sensorbee.v0/data"
)

type WordSplitter struct {
    field string
}

func (w *WordSplitter) Process(ctx *core.Context,
    t *core.Tuple, writer core.Writer) error {
    var kwd []string
    if v, ok := t.Data[w.field]; !ok {
        return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
    } else if s, err := data.AsString(v); err != nil {
        return fmt.Errorf("'%v' field must be string: %v", w.field, err)
    } else {
        kwd = strings.Split(s, " ")
    }

    for _, k := range kwd {
        out := t.Copy()
        out.Data[w.field] = data.String(k)
        if err := writer.Write(ctx, out); err != nil {
            return err
        }
    }
    return nil
}

func (w *WordSplitter) Terminate(ctx *core.Context) error {
    return nil
}

func CreateWordSplitter(decl udf.UDSFDeclarer,
    inputStream, field string) (udf.UDSF, error) {
    if err := decl.Input(inputStream, nil); err != nil {
        return nil, err
    }
    return &WordSplitter{
        field: field,
    }, nil
}
ticker.go
package udsfs

import (
    "sync/atomic"
    "time"

    "gopkg.in/sensorbee/sensorbee.v0/bql/udf"
    "gopkg.in/sensorbee/sensorbee.v0/core"
    "gopkg.in/sensorbee/sensorbee.v0/data"
)

type Ticker struct {
    interval time.Duration
    stopped  int32
}

func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
    var i int64
    for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
        newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
        if err := w.Write(ctx, newTuple); err != nil {
            return err
        }
        time.Sleep(t.interval)
    }
    return nil
}

func (t *Ticker) Terminate(ctx *core.Context) error {
    atomic.StoreInt32(&t.stopped, 1)
    return nil
}

func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
    interval, err := data.ToDuration(i)
    if err != nil {
        return nil, err
    }
    return &Ticker{
        interval: interval,
    }, nil
}
plugin/plugin.go
package plugin

import (
    "gopkg.in/sensorbee/sensorbee.v0/bql"
    "gopkg.in/sensorbee/sensorbee.v0/bql/udf"

    "github.com/sensorbee/examples/udsfs"
)

func init() {
    bql.MustRegisterGlobalSourceCreator("lorem",
        bql.SourceCreatorFunc(udsfs.CreateLoremSource))
    udf.MustRegisterGlobalUDSFCreator("word_splitter",
        udf.MustConvertToUDSFCreator(udsfs.CreateWordSplitter))
    udf.MustRegisterGlobalUDSFCreator("ticker",
        udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
Example BQL Statements
CREATE SOURCE lorem TYPE lorem;
CREATE STREAM lorem_words AS
    SELECT RSTREAM * FROM word_splitter("lorem", "text") [RANGE 1 TUPLES];

Results of word_splitter can be received by the following SELECT:

SELECT RSTREAM * FROM lorem_words [RANGE 1 TUPLES];

3.2.4. User-Defined States

This section describes how to write a UDS in Go.

3.2.4.1. Implementing a UDS

A struct implementing the following interface can be used as a UDS:

type SharedState interface {
    Terminate(ctx *Context) error
}

This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core package.

Terminate method is called when the UDS becomes no longer in use. It should release any resource that the UDS has allocated so far.

As an example, a UDS having a monotonically increasing counter can be implemented as follows:

type Counter struct {
    c int64
}

func (c *Counter) Terminate(ctx *core.Context) error {
    return nil
}

func (c *Counter) Next() int64 {
    return atomic.AddInt64(&c.c, 1)
}

At the moment, there’s no way to manipulate the UDS from BQL statements. UDSs are usually provided with a set of UDFs that read or update the UDS. It’ll be described later in Manipulating a UDS via a UDF. Before looking into the UDS manipulation, registering and creating a UDS needs to be explained.

3.2.4.2. Registering a UDS

To register a UDS to the SensorBee server , the UDS needs to provide its UDSCreator. UDSCreator is an interface defined in gopkg.in/sensorbee/sensorbee.v0/bql/udf package as follows:

type UDSCreator interface {
    CreateState(ctx *core.Context, params data.Map) (core.SharedState, error)
}

UDSCreator.CreateState method is called when executing a CREATE STATE statement. The method creates a new instance of the UDS and initializes it with the given parameters. The argument ctx has the processing context information and params has parameters specified in the WITH clause of the CREATE STATE.

The creator can be registered by RegisterGlobalUDSCreator or MustRegisterGlobalUDSCreator function defined in gopkg.in/sensorbee/sensorbee.v0/bql/udf package.

The following is the implementation and the registration of the creator for Counter UDS above:

type CounterCreator struct {
}

func (c *CounterCreator) CreateState(ctx *core.Context,
    params data.Map) (core.SharedState, error) {
    cnt := &Counter{}
    if v, ok := params["start"]; ok {
        i, err := data.ToInt(v)
        if err != nil {
            return nil, err
        }
        cnt.c = i - 1
    }
    return cnt, nil
}

func init() {
    udf.MustRegisterGlobalUDSCreator("my_counter", &CounterCreator{})
}

The creator in this example is registered with the UDS type name my_counter. The creator supports start parameter which is used as the first value that Counter.Next returns. The parameter can be specified in the CREATE STATE as follows:

CREATE STATE my_counter_instance TYPE my_counter WITH start = 100;

Because the creator creates a new instance every time the CREATE STATE is executed, there can be multiple instances of a specific UDS type:

CREATE STATE my_counter_instance1 TYPE my_counter;
CREATE STATE my_counter_instance2 TYPE my_counter;
CREATE STATE my_counter_instance3 TYPE my_counter;
...

Once an instance of the UDS is created by the CREATE STATE, UDFs can refer them and manipulate their state.

udf.UDSCreatorFunc

A function having the same signature as UDSCreator.CreateState can be converted into UDSCreator by by udf.UDSCreatorFunc utility function:

func UDSCreatorFunc(f func(*core.Context, data.Map) (core.SharedState, error)) UDSCreator

For example, CounterCreator can be defined as a function and registered as follows with this utility:

func CreateCounter(ctx *core.Context,
    params data.Map) (core.SharedState, error) {
    cnt := &Counter{}
    if v, ok := params["start"]; ok {
        i, err := data.ToInt(v)
        if err != nil {
            return nil, err
        }
        cnt.c = i - 1
    }
    return cnt, nil
}

func init() {
    udf.MustRegisterGlobalUDSCreator("my_counter",
        &udf.UDSCreatorFunc(CreateCounter))
}

To support SAVE STATE and LOAD STATE statements, however, this utility function cannot be used because the creator needs to have the LoadState method. How to support saving and loading is described later.

3.2.4.3. Manipulating a UDS via a UDF

To manipulate a UDS from BQL statements, a set of UDFs that read or update the UDS has to be provided with it:

func Next(ctx *core.Context, uds string) (int64, error) {
    s, err := ctx.SharedStates.Get(uds)
    if err != nil {
        return 0, err
    }

    c, ok := s.(*Counter)
    if !ok {
        return 0, fmt.Errorf("the state isn't a counter: %v", uds)
    }
    return c.Next(), nil
}

func init() {
    udf.MustRegisterGlobalUDF("my_next_count", udf.MustConvertGeneric(Next))
}

In this example, a UDF my_next_count is registered to the SensorBee server. The UDF calls Counter.Next method to obtain the next count and returns it. The UDF receives one argument uds that is the name of the UDS to be updated.

CREATE STATE my_counter_instance TYPE my_counter;
CREATE STREAM events_with_id AS
    SELECT RSTREAM my_next_count("my_counter_instance") AS id, *
    FROM events [RANGE 1 TUPLES];

The BQL statements above add IDs to tuples emitted from a stream events. The state my_counter_instance is created with the type my_counter. Then, my_next_count UDF is called with the name. Every time the UDF is called, the state of my_counter_instance is updated by its Next method.

my_next_count (i.e. Next function in Go) can look up the instance of the UDS by its name through core.Context.SharedStates. SharedStates manages all the UDSs created in a topology. SharedState.Get returns the instance of the UDS having the given name. It returns an error if it couldn’t find the instance. In the example above, my_next_count("my_counter_instance") will look up an instance of the UDS having the name my_counter_instance, which was previously created by the CREATE STATE. The UDS returned from Get method has the type core.SharedState and cannot directly be used as Counter. Therefore, it has to be cast to *Counter.

Since the state can be any type satisfying core.SharedState, a UDS can potentially have any information such as machine learning models, dictionaries for natural language processing, or even an in-memory database.

Note

As UDFs are concurrently called from multiple goroutines, UDSs also needs to be thread-safe.

3.2.4.4. Saving and Loading a UDS

Counter implemented so far doesn’t support saving and loading its state. Thus, its count will be reset every time the server restarts. To save the state and load it later on, the UDS and its creator need to provide some methods. After providing those method, the state can be saved by the SAVE STATE statement and loaded by LOAD STATE statement.

Supporting SAVE STATE

By adding Save method having the following signature to a UDS, the UDS can be saved by the SAVE STATE statement:

Save(ctx *core.Context, w io.Writer, params data.Map) error

Save method writes all the data that the state has to w io.Writer. The data can be written in any format as long as corresponding loading methods can reconstruct the state from it. It can be in JSON, msgpack, Protocol Buffer, and so on.

Warning

Providing forward/backward compatibility or version controlling of the saved data is the responsibility of the author of the UDS.

*core.Context has the processing context information. params argument is not used at the moment and reserved for the future use.

Once Save method is provided, the UDS can be saved by SAVE STATE statement:

SAVE STATE my_counter_instance;

The SAVE STATE doesn’t take any parameters now. The location and the physical format of the saved UDS data depend on the configuration of the SensorBee server or program running BQL statements. However, it is guaranteed that the saved data can be loaded by the same program via the LOAD STATE statement, which is described later.

Save method of previously implemented Counter can be as follows:

func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
    return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}

Note

Because this counter is very simple, there’s no version controlling logic in the method. As the minimum solution, having a version number at the beginning of the data is sufficient for most cases.

Supporting LOAD STATE

To support the LOAD STATE statement, a UDSCreator needs to have LoadState method having the following signature:

LoadState(ctx *core.Context, r io.Reader, params data.Map) (core.SharedState, error)

Note

LoadState method needs to be defined in a UDSCreator, not in the UDS itself.

LoadState method reads data from r io.Reader. The data has exactly the same format as the one previously written by Save method of a UDS. params has parameters specified in the SET clause in the LOAD STATE statement.

Note

Parameters specified in the SET clause doesn’t have to be same as ones given in the WITH clause of the CREATE STATE statement. See LOAD STATE for details.

When LoadState method returns an error, the LOAD STATE statement with CREATE IF NOT STATE doesn’t fallback to CREATE STATE, but it just fails.

Once LoadState method is added to the UDSCreator, the saved state can be loaded by LOAD STATE statement.

LoadState method of previously implemented CounterCreator can be as follows:

func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
    params data.Map) (core.SharedState, error) {
    cnt := &Counter{}
    if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
        return nil, err
    }
    return cnt, nil
}
Providing Load method in a UDS

In addition to implementing LoadState method in a UDS’s creator, a UDS itself can provide Load method. While LoadState method creates a new state instance and replace it with the previous instance, Load method dynamically modifies the existing instance. Therefore, Load method can potentially be more efficient than LoadState method although it has to provide appropriate failure handling and concurrency control so that (1) the UDS doesn’t become invalid on failure (i.e. Load methods is “exception safe”) or by concurrent calls, and (2) other operations on the UDS don’t block for a long time.

The signature of Load method is almost the same as LoadState method except that Load method doesn’t return a new core.SharedState but updates the UDS itself instead:

Load(ctx *Context, r io.Reader, params data.Map) error

Load method of previously implemented Counter can be as follows:

func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
    var cnt int64
    if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
        return err
    }
    atomic.StoreInt64(&c.c, cnt)
    return nil
}
How Loading is Processed

SensorBee tries to use these two loading methods LoadState and Load in the following rule:

  1. When a UDS’s creator doesn’t provide LoadState method, the LOAD STATE statement fails.

    • The LOAD STATE statement fails even if the UDS implements its Load method. To support the statement, LoadState method is always required in its creator. This is because Load method only works when an instance of the UDS is already created or loaded, and it cannot be used for a nonexistent instance.
    • The LOAD STATE CREATE IF NOT SAVED statement also fails if LoadState method isn’t provided. The statement calls CreateState method when the state hasn’t previously been saved. Otherwise, it’ll try to load the saved data. Therefore, if the data is previously saved and an instance of the UDS hasn’t been created yet, the statement cannot create a new instance without LoadState method in the creator. To be consistent on various conditions, the LOAD STATE CREATE IF NOT SAVED statement fails if LoadState method isn’t provided regardless of whether the state has been saved before or not.
  2. When a UDS’s creator provides LoadState method and the UDS doesn’t provide Load method, the LOAD STATE statement tries to load a model through LoadState method.

    • It will create a new instance so that it consumes twice as much memory.
  3. When a UDS’s creator provides LoadState method and the UDS also provides Load method,

    • Load method will be used when the instance has already been created or loaded.

      • LoadState method wouldn’t be used even if Load method failed.
    • LoadState method will be used otherwise.

Note

This is already mentioned in the list above, but LoadState method always needs to be provided even if a UDS implements Load method.

3.2.4.5. A Complete Example

A complete example of the state is shown in this subsection. Assume that the import path of the example repository is github.com/sensorbee/examples/counter, which doesn’t actually exist. The repository has two files:

  • counter.go
  • plugin/plugin.go
counter.go
package counter

import (
    "encoding/binary"
    "fmt"
    "io"
    "sync/atomic"

    "gopkg.in/sensorbee/sensorbee.v0/core"
    "gopkg.in/sensorbee/sensorbee.v0/data"
)

type Counter struct {
    c int64
}

func (c *Counter) Terminate(ctx *core.Context) error {
    return nil
}

func (c *Counter) Next() int64 {
    return atomic.AddInt64(&c.c, 1)
}

func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
    return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}

func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
    var cnt int64
    if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
        return err
    }
    atomic.StoreInt64(&c.c, cnt)
    return nil
}

type CounterCreator struct {
}

func (c *CounterCreator) CreateState(ctx *core.Context,
    params data.Map) (core.SharedState, error) {
    cnt := &Counter{}
    if v, ok := params["start"]; ok {
        i, err := data.ToInt(v)
        if err != nil {
            return nil, err
        }
        cnt.c = i - 1
    }
    return cnt, nil
}

func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
    params data.Map) (core.SharedState, error) {
    cnt := &Counter{}
    if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
        return nil, err
    }
    return cnt, nil
}

func Next(ctx *core.Context, uds string) (int64, error) {
    s, err := ctx.SharedStates.Get(uds)
    if err != nil {
        return 0, err
    }

    c, ok := s.(*Counter)
    if !ok {
        return 0, fmt.Errorf("the state isn't a counter: %v", uds)
    }
    return c.Next(), nil
}
plugin/plugin.go
package plugin

import (
    "gopkg.in/sensorbee/sensorbee.v0/bql/udf"

    "github.com/sensorbee/examples/counter"
)

func init() {
    udf.MustRegisterGlobalUDSCreator("my_counter",
        &counter.CounterCreator{})
    udf.MustRegisterGlobalUDF("my_next_count",
        udf.MustConvertGeneric(counter.Next))
}

3.2.4.6. Writing Tuples to a UDS

When a UDS implements core.Writer, the INSERT INTO statement can insert tuples into the UDS via the uds sink:

type Writer interface {
    Write(*Context, *Tuple) error
}

The following is the example of using the uds sink:

CREATE STATE my_state TYPE my_state_type;
CREATE SINK my_state_sink TYPE uds WITH name = "my_state";
INSERT INTO my_state_sink FROM some_stream;

If my_state_type doesn’t implement core.Writer, the CREATE SINK statement fails. Every time some_stream emits a tuple, the Write method of my_state is called.

Example

Models provided by Jubatus machine learning plugin for SensorBee implement the Write method. When tuples are inserted into a UDS, it trains the model it has.

3.2.5. Source Plugins

This section describes how to implement a source as a plugin of SensorBee.

3.2.5.1. Implementing a Source

A struct implementing the following interface can be a source:

type Source interface {
    GenerateStream(ctx *Context, w Writer) error
    Stop(ctx *Context) error
}

This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core package.

The GenerateStream methods actually generate tuples for subsequent streams. The argument ctx contains the information of the current processing context. w is the destination to where generated tuples are emitted. The Stop method stops GenerateStream. It should wait until the GenerateStream method call returns, but it isn’t mandatory.

Once the GenerateStream method is called, a source can emit as many tuples as many tuples as it requires. A source basically needs to return from its GenerateStream method when:

  1. it emitted all the tuples it has
  2. the Stop method was called
  3. a fatal error occurred

The Stop method can be called concurrently while the GenerateStream method is working and it must be thread-safe. As long as a source is used by components defined in SensorBee, it’s guaranteed that its Stop method is called only once and it doesn’t have to be idempotent. However, it is recommended that a source provide a termination check in its Stop method to avoid a double free problem.

A typical implementation of a source is shown below:

func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
    <initialization>
    defer func() {
        <clean up>
    }()

    for <check stop> {
        t := <create a new tuple>
        if err := w.Write(ctx, t); err != nil {
            return err
        }
    }
    return nil
}

func (s *MySource) Stop(ctx *core.Context) error {
    <turn on a stop flag>
    <wait until GenerateStream stops>
    return nil
}

The following example source emits tuple periodically:

type Ticker struct {
    interval time.Duration
    stopped  int32
}

func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
    var cnt int64
    for ; ; cnt++ {
        if atomic.LoadInt32(&t.stopped) != 0 {
            break
        }

        tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
        if err := w.Write(ctx, tuple); err != nil {
            return err
        }
        time.Sleep(t.interval)
    }
    return nil
}

func (t *Ticker) Stop(ctx *core.Context) error {
    atomic.StoreInt32(&t.stopped, 1)
    return nil
}

The interval field is initialized in SourceCreator, which is described later. This is the source version of the example in UDSF’s section. This implementation is a little wrong since the Stop method doesn’t wait until the GenerateStream method actually returns. Because implementing a thread-safe source which stops correctly is a difficult task, core package provides a utility function that implements a source’s Stop method on behalf of the source itself. See Utilities for details.

3.2.5.2. Registering a Source

To register a source to the SensorBee server, the source needs to provide its SourceCreator. The SourceCreator interface is defined in gopkg.in/sensorbee/sensorbee.v0/bql pacakge as follows:

type SourceCreator interface {
    CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error)
}

It only has one method: CreateSource. The CreateSource method is called when the CREATE SOURCE statement is executed. The ctx argument contains the information of the current processing context. ioParams has the name and the type name of the source, which are given in the CREATE SOURCE statement. params has parameters specified in the WITH clause of the CREATE SOURCE statement.

The creator can be registered by RegisterGlobalSourceCreator or MustRegisterGlobalSourceCreator function. As an example, the creator of Ticker above can be implemented and registered as follows:

type TickerCreator struct {
}

func (t *TickerCreator) CreateSource(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Source, error) {
    interval := 1 * time.Second
    if v, ok := params["interval"]; ok {
        i, err := data.ToDuration(v)
        if err != nil {
            return nil, err
        }
        interval = i
    }
    return &Ticker{
        interval: interval,
    }, nil
}

func init() {
    bql.MustRegisterGlobalSourceCreator("ticker", &TickerCreator{})
}

In this example, the source has a parameter interval which can be specified in the WITH clause of the CREATE SOURCE statement:

CREATE SOURCE my_ticker TYPE ticker WITH interval = 0.1;

my_ticker emits tuples that look like {"tick": 123} in every 100ms. Without the interval parameter, my_ticker will emit tuples in every one second by default.

3.2.5.3. Types of a Source

In addition to a regular source, there’re two more types of sources: a resumable source and a rewindable source. This subsection describes those sources in detail.

Resumable Sources

A source that supports PAUSE SOURCE and the RESUME SOURCE statements are called a resumable source.

Although all sources support them by default, which is done by the core package, a source can explicitly implement core.Resumable interface so that it can provide more efficient pause and resume capability:

type Resumable interface {
    Pause(ctx *Context) error
    Resume(ctx *Context) error
}

The Pause method is called when PAUSE SOURCE statement is executed and the Resume method is called by RESUME SOURCE. The Pause method may be called even when the source is already paused, so is the Resume method.

A source can be non-resumable by implementing these method to return an error:

type MyNonResumableSource struct {
    ...
}

...

func (m *MyNonResumableSource) Pause(ctx *core.Context) error {
    return errors.New("my_non_resumable_source doesn't support pause")
}

func (m *MyNonResumableSource) Resume(ctx *core.Context) error {
    return errors.New("my_non_resumable_source doesn't support resume")
}
Rewindable Sources

A rewindable source can re-generate the same tuples again from the beginning after it emits all tuples or while it’s emitting tuples. A rewindable source supports the REWIND SOURCE statement.

A source can become rewindable by implementing the core.RewindableSource interface:

type RewindableSource interface {
    Source
    Resumable

    Rewind(ctx *Context) error
}

A rewindable source also needs to implement core.Resumable to be rewindable.

Note

The reason that a rewindable source also needs to be resumable is due to the internal implementation of the default pause/resume support. While a source is paused, it blocks core.Writer.Write called in the GenerateStream method. The Rewind method could also be blocked while the Write call is being blocked until the Resume method is called. It, of course, depends on the implementation of a source, but it’s very error-prone. Therefore, implementing the Resumable interface is required to be rewindable at the moment.

Unlike a regular source, the GenerateStream method of a rewindable source must not return after it emits all tuples. Instead, it needs to wait until the Rewind method or the Stop method is called. Once it returns, the source is considered stopped and no further operation including the REWIND SOURCE statement wouldn’t work on the source.

Due to its nature, a stream isn’t often resumable. A resumable source is mostly used for relatively static data sources such as relations or files. Also, because implementing the RewindableSource interface is even harder than implementing the Resumable interface, utilities are usually used.

3.2.5.4. Utilities

There’re some utilities to support implementing sources and its creators. This subsection describes each utility.

core.ImplementSourceStop

core.ImplementSourceStop is a function that implements the Stop method of a source in a thread-safe manner:

func ImplementSourceStop(s Source) Source

A source returned from this function is resumable, but not rewindable even if the original source implements the core.RewindableSource interface. In addition, although a source passed to core.ImplementSourceStop can explicitly implement the core.Resumable interface, its Pause and Resume method will never be called because the source returned from core.ImplementSourceStop also implements those methods and controls pause and resume.

To apply this function, a source must satisfy following restrictions:

  1. The GenerateStream method must be implemented in a way that it can safely be called again after it returns.

  2. The GenerateStream method must return when the core.Writer.Write returned core.ErrSourceStopped. The method must return exactly the same error variable that the writer returned.

  3. The Stop method just returns nil.

    • This means all resource allocation and deallocation must be done within the GenerateStream method.

A typical implementation of a source passed to core.ImplementSourceStop is shown below:

func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
    <initialization>
    defer func() {
        <clean up>
    }()

    for {
        t := <create a new tuple>
        if err := w.Write(ctx, t); err != nil {
            return err
        }
    }
    return nil
}

func (s *MySource) Stop(ctx *core.Context) error {
    return nil
}

If a source wants to ignore errors returned from core.Writer.Write other than core.ErrSourceStopped, the GenerateStream method can be modified as:

if err := w.Write(ctx, t); err != nil {
    if err == core.ErrSourceStopped {
        return err
    }
}

By applying core.ImplementSourceStop, the Ticker above can be implemented as follows:

type Ticker struct {
    interval time.Duration
}

func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
    var cnt int64
    for ; ; cnt++ {
        tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
        if err := w.Write(ctx, tuple); err != nil {
            return err
        }
        time.Sleep(t.interval)
    }
    return nil
}

func (t *Ticker) Stop(ctx *core.Context) error {
    return nil
}

type TickerCreator struct {
}

func (t *TickerCreator) CreateSource(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Source, error) {
    interval := 1 * time.Second
    if v, ok := params["interval"]; ok {
        i, err := data.ToDuration(v)
        if err != nil {
            return nil, err
        }
        interval = i
    }
    return core.ImplementSourceStop(&Ticker{
        interval: interval,
    }), nil
}

There’s no stopped flag now. In this version, the Stop method of the source returned by core.ImplementSourceStop waits until the GenerateStream method returns.

core.NewRewindableSource

core.NewRewindableSource is a function that converts a regular source into a rewindable source:

func NewRewindableSource(s Source) RewindableSource

A source returned from this function is resumable and rewindable. A source passed to the function needs to satisfy the same restrictions as core.ImplementSourceStop. In addition to that, there’s one more restriction for core.NewRewindableSource:

  1. The GenerateStream method must return when the core.Writer.Write returned core.ErrSourceRewound. The method must return exactly the same error variable that the writer returned.

Although the GenerateStream method of a rewindable source must not return after it emits all tuples, a source passed to the core.NewRewindableSource function needs to return in that situation. For example, let’s assume there’s a source that generate tuples from each line in a file. To implement the source without a help of the utility function, its GenerateStream must wait for the Rewind method to be called after it processes all lines in the file. However, with the utility, its GenerateStream can just return once it emits all tuples. Therefore, a typical implementation of a source passed to the utility can be same as a source for core.ImplementSourceStop.

As it will be shown later, a source that infinitely emits tuples can also be rewindable in some sense.

The following is an example of TickerCreator modified from the example for core.ImplementSourceStop:

func (t *TickerCreator) CreateSource(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Source, error) {
    interval := 1 * time.Second
    if v, ok := params["interval"]; ok {
        i, err := data.ToDuration(v)
        if err != nil {
            return nil, err
        }
        interval = i
    }

    rewindable := false
    if v, ok := params["rewindable"]; ok {
        b, err := data.AsBool(v)
        if err != nil {
            return nil, err
        }
        rewindable = b
    }

    src := &Ticker{
        interval: interval,
    }
    if rewindable {
        return core.NewRewindableSource(src), nil
    }
    return core.ImplementSourceStop(src), nil
}

In this example, Ticker has the rewindable parameter. If it is true, the source becomes rewindable:

CREATE SOURCE my_rewindable_ticker TYPE ticker WITH rewindable = true;

By issuing the REWIND SOURCE statement, my_rewindable_ticker resets the value of tick field:

REWIND SOURCE my_rewindable_ticker;

-- output examples of SELECT RSTREAM * FROM my_rewindable_ticker [RANGE 1 TUPLES];
{"tick":0}
{"tick":1}
{"tick":2}
...
{"tick":123}
-- REWIND SOURCE is executed here
{"tick":0}
{"tick":1}
...
bql.SourceCreatorFunc

bql.SourceCreatorFunc is a function that converts a function having the same signature as SourceCreator.CreateSource to a SourceCreator:

func SourceCreatorFunc(f func(*core.Context,
    *IOParams, data.Map) (core.Source, error)) SourceCreator

For example, TickerCreator above and its registration can be modified to as follows with this utility:

func CreateTicker(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Source, error) {
    interval := 1 * time.Second
    if v, ok := params["interval"]; ok {
        i, err := data.ToDuration(v)
        if err != nil {
            return nil, err
        }
        interval = i
    }
    return core.ImplementSourceStop(&Ticker{
        interval: interval,
    }), nil
}

func init() {
    bql.MustRegisterGlobalSourceCreator("ticker",
        bql.SourceCreatorFunc(CreateTicker))
}

3.2.5.5. A Complete Example

A complete example of Ticker is shown in this subsection. Assume that the import path of the example is github.com/sensorbee/examples/ticker, which doesn’t actually exist. There’re two files in the repository:

  • ticker.go
  • plugin/plugin.go

The example uses core.NewRewindableSource utility function.

ticker.go
package ticker

import (
    "time"

    "gopkg.in/sensorbee/sensorbee.v0/bql"
    "gopkg.in/sensorbee/sensorbee.v0/core"
    "gopkg.in/sensorbee/sensorbee.v0/data"
)

type Ticker struct {
    interval time.Duration
}

func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
    var cnt int64
    for ; ; cnt++ {
        tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
        if err := w.Write(ctx, tuple); err != nil {
            return err
        }
        time.Sleep(t.interval)
    }
    return nil
}

func (t *Ticker) Stop(ctx *core.Context) error {
    // This method will be implemented by utility functions.
    return nil
}

type TickerCreator struct {
}

func (t *TickerCreator) CreateSource(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Source, error) {
    interval := 1 * time.Second
    if v, ok := params["interval"]; ok {
        i, err := data.ToDuration(v)
        if err != nil {
            return nil, err
        }
        interval = i
    }

    rewindable := false
    if v, ok := params["rewindable"]; ok {
        b, err := data.AsBool(v)
        if err != nil {
            return nil, err
        }
        rewindable = b
    }

    src := &Ticker{
        interval: interval,
    }
    if rewindable {
        return core.NewRewindableSource(src), nil
    }
    return core.ImplementSourceStop(src), nil
}
plugin/plugin.go
package plugin

import (
    "gopkg.in/sensorbee/sensorbee.v0/bql"

    "github.com/sensorbee/examples/ticker"
)

func init() {
    bql.MustRegisterGlobalSourceCreator("ticker", &ticker.TickerCreator{})
}

3.2.6. Sink Plugins

This section describes how to implement a sink as a plugin of SensorBee.

3.2.6.1. Implementing a Sink

A struct implementing the following interface can be a sink:

type Sink interface {
    Write(ctx *Context, t *Tuple) error
    Close(ctx *Context) error
}

This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core package.

The Write method write a tuple to a destination of the sink. The argument ctx contains the information of the current processing context. t is the tuple to be written. The Close method is called when the sink becomes unnecessary. It must release all resources allocated for the sink.

The following example sink write a tuple as a JSON to stdout:

type StdoutSink struct {
}

func (s *StdoutSink) Write(ctx *core.Context, t *core.Tuple) error {
    _, err := fmt.Fprintln(os.Stdout, t.Data)
    return err
}

func (s *StdoutSink) Close(ctx *core.Context) error {
    // nothing to release
    return nil
}

A sink is initialized by its SinkCreator, which is described later.

Note

SensorBee doesn’t provide buffering or retry capability for sinks.

3.2.6.2. Registering a Sink

To register a sink to the SensorBee server, the sink needs to provide its SinkCreator. The SinkCreator interface is defined in gopkg.in/sensorbee/sensorbee.v0/bql pacakge as follows:

// SinkCreator is an interface which creates instances of a Sink.
type SinkCreator interface {
    // CreateSink creates a new Sink instance using given parameters.
    CreateSink(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Sink, error)
}

It only has one method: CreateSink. The CreateSink method is called when the CREATE SINK statement is executed. The ctx argument contains the information of the current processing context. ioParams has the name and the type name of the sink, which are given in the CREATE SINK statement. params has parameters specified in the WITH clause of the CREATE SINK statement.

The creator can be registered by RegisterGlobalSinkCreator or MustRegisterGlobalSinkCreator function. As an example, the creator of StdoutSink above can be implemented and registered as follows:

type StdoutSinkCreator struct {
}

func (s *StdoutSinkCreator) CreateSink(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
    return &StdoutSink{}, nil
}

func init() {
    bql.MustRegisterGlobalSinkCreator("my_stdout", &StdoutSinkCreator{})
}

This sink doesn’t have parameters specified in the WITH clause of the CREATE SINK statement. How to handle parameters for sink is same as how source does. See Source Plugins for more details.

3.2.6.3. Utilities

There’s one utility function for sink plugins: SinkCreatorFunc:

func SinkCreatorFunc(f func(*core.Context,
    *IOParams, data.Map) (core.Sink, error)) SinkCreator

This utility function is defined in gopkg.in/sensorbee/sensorbee.v0/bql pacakge. It converts a function having the same signature as SinkCreator.CreateSink to a SinkCreator. With this utility, for example, StdoutSinkCreator can be modified to:

func CreateStdoutSink(ctx *core.Context,
    ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
    return &StdoutSink{}, nil
}

fucn init() {
    bql.MustRegisterGlobalSinkCreator("stdout",
        bql.SinkCreatorFunc(CreateStdoutSink))
}

3.2.6.4. A Complete Example

A complete example of the sink is shown in this subsection. The package name for the sink is stdout and StdoutSink is renamed to Sink. Also, this example uses SinkCreatorFunc utility for SinkCreator.

Assume that the import path of the example is github.com/sensorbee/examples/stdout, which doesn’t actually exist. The repository has to files:

  • stdout.go
  • plugin/plugin.go
stdout.go
package stdout

import (
    "fmt"
    "os"

    "gopkg.in/sensorbee/sensorbee.v0/bql"
    "gopkg.in/sensorbee/sensorbee.v0/core"
    "gopkg.in/sensorbee/sensorbee.v0/data"
)

type Sink struct {
}

func (s *Sink) Write(ctx *core.Context, t *core.Tuple) error {
    _, err := fmt.Fprintln(os.Stdout, t.Data)
    return err
}

func (s *Sink) Close(ctx *core.Context) error {
    return nil
}

func Create(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
    return &Sink{}, nil
}
plugin/plugin.go
package plugin

import (
    "gopkg.in/sensorbee/sensorbee.v0/bql"

    "github.com/sensorbee/examples/stdout"
)

func init() {
    bql.MustRegisterGlobalSinkCreator("my_stdout",
        bql.SinkCreatorFunc(stdout.Create))
}