A Stream Processing API for Go

Vladimir Vivien
Automi
Published in
8 min readJul 31, 2019

--

With stream processing, data is decomposed as sequential elements that are made available over time allowing for processing of large (and possibly unbounded) data sets. This post is about project Automi, a Go package that allows developers to create programs to process data using stream processing methodology.

Automi on Github https://github.com/vladimirvivien/automi

At high level, Automi offers the following features

  • Go semantics to compose powerful multi-stage data streams processors
  • Support for functional programming style operations
  • Use of native Go types as data sources and sinks (i.e. channel, slice, etc)
  • Comes with operators to transform and filter streamed data.
  • Support for batched operations (Sort, GroupBy, Sum, etc)

To get started, let us consider an example where a source emits rune values unto a stream. Over time, each rune is processed along the stream. First, a filter removes unwanted runes, then a mapper maps each rune unto its string equivalent, and lastly the streamed items are collected. Below the stream and its operation are depicted as a factory line which shows the multi-stage processing of stream.

Stream processing illustrated

Next, let us see how Automi can be used to realize the same example to stream and process rune values in code. The following source snippet creates a stream from a slice of runes ([]rune). Then, operations are added to the stream to filter and map the runes as they move downstream.

func main() {
// create new stream with a slice of runes as emitter
strm := stream.New(
[]rune("B世!ぽ@opqDQRS#$%^...O6PTnVWXѬYZbcef7ghijCklrAstvw"),
)
// filter out non-capitalized alpha chars
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
})
// map each rune to string value
strm.Map(func(item rune) string {
return string(item)
})
// route string characters to Stdout using a collector
strm.Into(collectors.Writer(os.Stdout))
// Open the stream
if err := <-strm.Open(); err != nil {
log.Fatal(err)
}
}

Source code listing 1

Let us unpack the previous code to understand what is going on.

  • Stream strm is created from slice []rune. Each item in the slice is streamed out sequentially.
  • Method strm.Filter applies user-defined function func(item rune) bool for every rune streamed. It filters out unwanted runes by returning true when the rune represents a capitalized alpha character and false when it is not.
  • Method strm.Map receives filtered rune items from the previous operator and applies user-defined function func (item rune) string to cast each rune value to a string value for downstream consumption.
  • Lastly, method strm.Into specifies a collector which collects the string items. In this example, the strings are collected and written to the Stdout to be printed on the terminal.

Note that up to this point, the stream is merely declared and will not do anything useful until it is open. For the stream to run, method strm.Open() must be invoked to start the stream and apply the attached operations. When this program is executed, it prints the following to standard output:

> go run .
BDQRSUEFGHIJKLMNOPTVWXYZCA

Automi Concepts

Automi uses a pipeline metaphor that is implemented using Go channels as a conduit for data flow (as described by Sameer Ajmani in his blog). As depicted below, Automi uses four main components including the stream itself: a data emitter that emits data unto the stream, stream operators that manipulate each streamed item, and a collector to accumulate streamed items.

Automi components

Stream

A stream represents the conduit within which data elements can flow. Automi uses Go channels internally as a conduit for streamed elements. This means streams are support features such as buffering, automatic synchronous backpressure, and concurrency safety.

An Automi stream is created with the New function which can take as parameter an emitter (see below) or a value that can be wrapped into an emitter:

strm := stream.New(<emitter or value>)

The Stream type (returned by New) uses a fluent API to configure the stream by continually attaching method to the stream as shown in this code snippet:

func main() {
strm := stream.New([]rune(...))
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
return string(item)
}).Into(collectors.Writer(os.Stdout))
...
}

Emitter

A stream starts with an emitter which can be an in-memory, network, or file resource used to emit streamed data. Package emitters comes with the following emitter implementations:

emitters.Channel
emitters.CSV
emitters.Reader
emitters.Scanner
emitters.Slice

If these do not meet your needs, you can also create custom emitters by implementing interface api.Emitter:

type Emitter interface { 
GetOutput() <-chan interface{}
}

Operators

A stream operator applies an operation to each item as it flows though the stream. Automi leverages Go’s support for high-order functions to support a functional programming idiom allowing developers to specify custom operators as user-defined functions as shown below:

stream.Filter(func(row []string) bool {
count, err := strconv.Atoi(row[1])
if err != nil {
count = 0
}
return (count > 0)

})

Automi comes with many operator methods as listed below:

Stream.Filter
Stream.Map
Stream.FlatMap
Stream.Reduce
Stream.Process
Stream.GroupByKey
Stream.GroupByName
Stream.GroupByPos
Stream.Sort
Stream.SortByKey
Stream.SortByName
Stream.SortByPos
Stream.SortWith
Stream.Sum
Stream.SumByKey
Stream.SumByName
Stream.SumByPos
Stream.SumAllKeys

Many of the operators listed above support user-defined functions while others apply configuration, transformation, and other functionalities to the streamed.

Collectors

A collector represents either an in-memory, network, or file resource which can accumulate streamed items. Automi can accumulate streamed data in the following types defined in the collectors package:

colelctors.CSV
collectors.Func
collectors.Null
collectors.Slice
collectors.Writer

If you need your own custom collector, you can implement interface api.Collector:

type Collector interface { 
SetInput(<-chan interface{})
}

More Streaming Examples

Now, let us explore more examples showing how Automi can be used.

Streaming from channels

The following example shows how data can be streamed from a source backed by a Go channel. As illustrated below, function emitterFunc returns a value of type <-chan time.Time that is used as the emitter source for the stream. A single operator is applied to map streamed item time.Time to string to be collected by writer collector for output unto Stdout.

Streaming from channel-backed source

The following source snippet shows the code representation of the stream above:

func main() {
// emitterFunc returns a chan used for data
emitterFunc := func() <-chan time.Time {
times := make(chan time.Time)
go func() {
times <- time.Unix(100000, 0)
times <- time.Unix(2*100000, 0)
times <- time.Unix(4*100000, 0)
times <- time.Unix(8*100000, 0)
close(times)
}()
return times
}
strm := stream.New(emitterFunc())
strm.Map(func(item time.Time) string {
return item.String()
}).Into(collectors.Writer(os.Stdout))
if err := <-strm.Open(); err != nil {
fmt.Println(err)
return
}
}

Source code listing 2

When the stream is open, it simply prints the collected string values to standard output.

Streaming HTTP requests

An Automi stream can be sourced from any value that implements io.Reader or collect streamed values using any value that implements io.Writer. The following illustrates this by depicting a server that streams byte slices from http.Request.Body , applies a base64 transformation to each slice, then write the transformed item into http.Response.

HTTP server streaming items from request.Body

The following snippet shows the source code representation of the example HTTP server above using an http.HandleFunc:

func main() {   http.HandleFunc(
"/",
func(resp http.ResponseWriter, req *http.Request) {
resp.Header().Add("Content-Type", "text/html")
resp.WriteHeader(http.StatusOK)
strm := stream.New(req.Body)
strm.Process(func(data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}).Into(resp)
if err := <-strm.Open(); err != nil {
resp.WriteHeader(http.StatusInternalServerError)
}
},
)
http.ListenAndServe(":4040", nil)
}

Source code listing 3

When the server receives any request on path /, it will stream the content of req.Body which implements io.ReadCloser, apply a user-defined operation via stream method Process to calculate the base64 encoding of the streamed bytes, then write the result back into resp.

Streaming using CSVs

The next example illustrates a stream that is sourced from a CSV file, applies two user-defined operators, then the result is collected into another CSV as shown below.

Streaming from CSV files

The snippet below shows the source code representation for this example. It uses type emitters.CSV to create CSV emitter csvEmitter which serializes each row as []string unto the stream. The Map method applies an operator to return the first 6 items in each row and the Filter method removes any row where the second column (row[1]) is equal to zero.

func main() {
csvEmitter := emitters.CSV("./stats.txt").
CommentChar('#').
DelimChar(',').
HasHeaders()
stream := stream.New(csvEmitter)

// select first 6 cols per row:
stream.Map(func(row []string) []string {
return row[:6]
})

// filter out rows with col[1] = 0
stream.Filter(func(row []string) bool {
count, err := strconv.Atoi(row[1])
if err != nil {
count = 0
}
return (count > 0)
})
stream.Into(collectors.CSV("./out.txt"))
if err := <-stream.Open(); err != nil {
fmt.Println(err)
return
}
}

Source code listing 4

When the stream is open, it uses collectors.CSV to collect the streamed items into another CSV file.

Additional examples

There are many more examples available on GitHub :

  • Batch — examples showing batched operators (i.e. sort, sum, count, etc)
  • Collectors — examples of different collector components.
  • Emitters — examples of different emitter components.
  • Error handling — shows how to setup error handling with Automi streams.
  • gRPC streaming — an example showing how to stream items from gPRC streaming services.
  • Logging — examples of logging stream events at runtime
  • MD5 — implements the MD5 example from Sameer Ajmani pipeline post
  • Network — examples of streaming data to and networked resources
  • Wordcount — a simple word count example

All examples are found here github.com/vladimirvivien/automi/examples

Roadmap

Automi is being released as 0.1.0-alpha.0 as an API that can work in many contexts where stream processing is needed. On the roadmap to GA Automi has the following aspirations:

  • Additional methods to control the stream (i.e. run window, run until, etc)
  • Introduction of new operators (i.e Join, Broadcast, etc)
  • Ability to use streams as emitter or collector sources
  • Concurrency control (i.e. specify internal worker pool size)
  • Timeouts and deadline for process operators
  • Etc

If you don’t see your favorite feature, open an issue and let me know!

Happy streaming!

--

--