A Stream Processing API for Go
With stream processing, data is decomposed as sequential elements that are made available over time allowing for processing of large (and possibly unbounded) data sets. This post is about project Automi, a Go package that allows developers to create programs to process data using stream processing methodology.
Automi on Github https://github.com/vladimirvivien/automi
At high level, Automi offers the following features
- Go semantics to compose powerful multi-stage data streams processors
- Support for functional programming style operations
- Use of native Go types as data sources and sinks (i.e. channel, slice, etc)
- Comes with operators to transform and filter streamed data.
- Support for batched operations (Sort, GroupBy, Sum, etc)
To get started, let us consider an example where a source emits rune values unto a stream. Over time, each rune is processed along the stream. First, a filter removes unwanted runes, then a mapper maps each rune unto its string equivalent, and lastly the streamed items are collected. Below the stream and its operation are depicted as a factory line which shows the multi-stage processing of stream.
Next, let us see how Automi can be used to realize the same example to stream and process rune values in code. The following source snippet creates a stream from a slice of runes ([]rune
). Then, operations are added to the stream to filter and map the runes as they move downstream.
func main() {
// create new stream with a slice of runes as emitter
strm := stream.New(
[]rune("B世!ぽ@opqDQRS#$%^...O6PTnVWXѬYZbcef7ghijCklrAstvw"),
) // filter out non-capitalized alpha chars
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}) // map each rune to string value
strm.Map(func(item rune) string {
return string(item)
}) // route string characters to Stdout using a collector
strm.Into(collectors.Writer(os.Stdout)) // Open the stream
if err := <-strm.Open(); err != nil {
log.Fatal(err)
}
}
Let us unpack the previous code to understand what is going on.
- Stream
strm
is created from slice[]rune
. Each item in the slice is streamed out sequentially. - Method
strm.Filter
applies user-defined functionfunc(item rune) bool
for every rune streamed. It filters out unwanted runes by returningtrue
when the rune represents a capitalized alpha character andfalse
when it is not. - Method
strm.Map
receives filtered rune items from the previous operator and applies user-defined functionfunc (item rune) string
to cast each rune value to a string value for downstream consumption. - Lastly, method
strm.Into
specifies a collector which collects the string items. In this example, the strings are collected and written to the Stdout to be printed on the terminal.
Note that up to this point, the stream is merely declared and will not do anything useful until it is open. For the stream to run, method strm.Open()
must be invoked to start the stream and apply the attached operations. When this program is executed, it prints the following to standard output:
> go run .
BDQRSUEFGHIJKLMNOPTVWXYZCA
Automi Concepts
Automi uses a pipeline metaphor that is implemented using Go channels as a conduit for data flow (as described by Sameer Ajmani in his blog). As depicted below, Automi uses four main components including the stream itself: a data emitter that emits data unto the stream, stream operators that manipulate each streamed item, and a collector to accumulate streamed items.
Stream
A stream represents the conduit within which data elements can flow. Automi uses Go channels internally as a conduit for streamed elements. This means streams are support features such as buffering, automatic synchronous backpressure, and concurrency safety.
An Automi stream is created with the New
function which can take as parameter an emitter (see below) or a value that can be wrapped into an emitter:
strm := stream.New(<emitter or value>)
The Stream
type (returned by New
) uses a fluent API to configure the stream by continually attaching method to the stream as shown in this code snippet:
func main() {
strm := stream.New([]rune(...))
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
return string(item)
}).Into(collectors.Writer(os.Stdout))
...
}
Emitter
A stream starts with an emitter which can be an in-memory, network, or file resource used to emit streamed data. Package emitters comes with the following emitter implementations:
emitters.Channel
emitters.CSV
emitters.Reader
emitters.Scanner
emitters.Slice
If these do not meet your needs, you can also create custom emitters by implementing interface api.Emitter
:
type Emitter interface {
GetOutput() <-chan interface{}
}
Operators
A stream operator applies an operation to each item as it flows though the stream. Automi leverages Go’s support for high-order functions to support a functional programming idiom allowing developers to specify custom operators as user-defined functions as shown below:
stream.Filter(func(row []string) bool {
count, err := strconv.Atoi(row[1])
if err != nil {
count = 0
}
return (count > 0)
})
Automi comes with many operator methods as listed below:
Stream.Filter
Stream.Map
Stream.FlatMap
Stream.Reduce
Stream.Process
Stream.GroupByKey
Stream.GroupByName
Stream.GroupByPos
Stream.Sort
Stream.SortByKey
Stream.SortByName
Stream.SortByPos
Stream.SortWith
Stream.Sum
Stream.SumByKey
Stream.SumByName
Stream.SumByPos
Stream.SumAllKeys
Many of the operators listed above support user-defined functions while others apply configuration, transformation, and other functionalities to the streamed.
Collectors
A collector represents either an in-memory, network, or file resource which can accumulate streamed items. Automi can accumulate streamed data in the following types defined in the collectors package:
colelctors.CSV
collectors.Func
collectors.Null
collectors.Slice
collectors.Writer
If you need your own custom collector, you can implement interface api.Collector
:
type Collector interface {
SetInput(<-chan interface{})
}
More Streaming Examples
Now, let us explore more examples showing how Automi can be used.
Streaming from channels
The following example shows how data can be streamed from a source backed by a Go channel. As illustrated below, function emitterFunc
returns a value of type <-chan time.Time
that is used as the emitter source for the stream. A single operator is applied to map streamed item time.Time
to string
to be collected by writer collector for output unto Stdout
.
The following source snippet shows the code representation of the stream above:
func main() {
// emitterFunc returns a chan used for data
emitterFunc := func() <-chan time.Time {
times := make(chan time.Time)
go func() {
times <- time.Unix(100000, 0)
times <- time.Unix(2*100000, 0)
times <- time.Unix(4*100000, 0)
times <- time.Unix(8*100000, 0)
close(times)
}()
return times
} strm := stream.New(emitterFunc())
strm.Map(func(item time.Time) string {
return item.String()
}).Into(collectors.Writer(os.Stdout)) if err := <-strm.Open(); err != nil {
fmt.Println(err)
return
}
}
When the stream is open, it simply prints the collected string values to standard output.
Streaming HTTP requests
An Automi stream can be sourced from any value that implements io.Reader or collect streamed values using any value that implements io.Writer. The following illustrates this by depicting a server that streams byte slices from http.Request.Body
, applies a base64 transformation to each slice, then write the transformed item into http.Response
.
The following snippet shows the source code representation of the example HTTP server above using an http.HandleFunc
:
func main() { http.HandleFunc(
"/",
func(resp http.ResponseWriter, req *http.Request) {
resp.Header().Add("Content-Type", "text/html")
resp.WriteHeader(http.StatusOK) strm := stream.New(req.Body)
strm.Process(func(data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}).Into(resp) if err := <-strm.Open(); err != nil {
resp.WriteHeader(http.StatusInternalServerError)
}
},
) http.ListenAndServe(":4040", nil)
}
When the server receives any request on path /
, it will stream the content of req.Body
which implements io.ReadCloser
, apply a user-defined operation via stream method Process
to calculate the base64 encoding of the streamed bytes, then write the result back into resp
.
Streaming using CSVs
The next example illustrates a stream that is sourced from a CSV file, applies two user-defined operators, then the result is collected into another CSV as shown below.
The snippet below shows the source code representation for this example. It uses type emitters.CSV
to create CSV emitter csvEmitter
which serializes each row as []string
unto the stream. The Map
method applies an operator to return the first 6 items in each row and the Filter
method removes any row where the second column (row[1]
) is equal to zero.
func main() {
csvEmitter := emitters.CSV("./stats.txt").
CommentChar('#').
DelimChar(',').
HasHeaders() stream := stream.New(csvEmitter)
// select first 6 cols per row:
stream.Map(func(row []string) []string {
return row[:6]
})
// filter out rows with col[1] = 0
stream.Filter(func(row []string) bool {
count, err := strconv.Atoi(row[1])
if err != nil {
count = 0
}
return (count > 0)
})
stream.Into(collectors.CSV("./out.txt")) if err := <-stream.Open(); err != nil {
fmt.Println(err)
return
}
}
When the stream is open, it uses collectors.CSV
to collect the streamed items into another CSV file.
Additional examples
There are many more examples available on GitHub :
- Batch — examples showing batched operators (i.e. sort, sum, count, etc)
- Collectors — examples of different collector components.
- Emitters — examples of different emitter components.
- Error handling — shows how to setup error handling with Automi streams.
- gRPC streaming — an example showing how to stream items from gPRC streaming services.
- Logging — examples of logging stream events at runtime
- MD5 — implements the MD5 example from Sameer Ajmani pipeline post
- Network — examples of streaming data to and networked resources
- Wordcount — a simple word count example
All examples are found here github.com/vladimirvivien/automi/examples
Roadmap
Automi is being released as 0.1.0-alpha.0 as an API that can work in many contexts where stream processing is needed. On the roadmap to GA Automi has the following aspirations:
- Additional methods to control the stream (i.e. run window, run until, etc)
- Introduction of new operators (i.e Join, Broadcast, etc)
- Ability to use streams as emitter or collector sources
- Concurrency control (i.e. specify internal worker pool size)
- Timeouts and deadline for process operators
- Etc
If you don’t see your favorite feature, open an issue and let me know!
Happy streaming!