How Ballerina Streaming Window Works???

Lahiru Jayasekera
5 min readJan 10, 2019

--

I think all of you are aware of Ballerina language(If not why am I looking at the article? noh 😜).

Ballerina is a simple programming language whose syntax and platform address the hard problems of integration. It is a general purpose, concurrent, transactional, statically and strongly typed programming language with both textual and graphical syntaxes. Its specialization is integration — it brings fundamental concepts, ideas and tools of distributed system integration into the language and offers a type safe, concurrent environment to implement such applications. These include distributed transactions, reliable messaging, stream processing, workflows and container management platforms.

(source-https://ballerina.io)

See the words Stream Processing are highlighted. That’s because stream processing capabilities is not common in everyday general purpose programming languages. If you are new to this subject I suggest you have a look into this article, Story Behind Ballerina Streams (by Mohanadarshan Vivekanandalingam, contributor to Ballerina streaming).

As prerequisites I recommend you have a basic knowledge about Ballerina language(https://ballerina.io/learn/).

Beginning

First of all we’ll have a look into a simple Ballerina streaming query with a length window. Think that you have a stream named stockStream into which you get different stock events. Now if you want to calculate the total quantity of every last 5 stocks placed, you can use a length window as follows.

from stockStream window length(5)
select stockStream.name, sum(stockStream.quantity) as totalQty
=> (StockTotal [] total) {
foreach var t in total {
StockTotalStream.publish(t);
}
}

I used the length window here as it is simple and easy to understand. Following image sequence demonstrates how a length window of length 2 works.

This figure depicts how length window works

You can see that this approach is like a sliding window. That means it holds the last window length events at a given time, and gets updated for each arrival and expiry. Now we’ll have a look into how we implemented this into Ballerina streaming.

Length window has only one parameter which is window length. You pass this parameter in the streaming query. What we do is we desugar(removing the syntactic sugar; for more info — Ballerina Compiler Design) that window length(5) into Ballerina code, which initializes a LengthWindow object and pass the incoming events to its process function. Following is the implementation of that LengthWindow object.

What happens here is when an event comes to process function, it is inserted into outputEvents array. Then the same event is inserted into a global linked list after tagging it as an EXPIRED event. Then outputEvents array is passed to the next process. If the size of the linked list is equal to the user specified window length, then we remove the first event in linked list and add it to the outputEvents array. This is tagged as EXPIRED, hence it acts as the boundary of window.

Yes, that is it. That is simply how the length window works.

But there are a bunch of other inbuilt windows in Ballerina that you can use.

Lets have a look how each of these windows work,

1. time window

time(int windowTime)

A sliding time window that holds events that arrived during the last windowTime period at a given time, and gets updated for each event arrival and expiry.

2. timeBatch window

timeBatch(int windowTime)

A batch (tumbling) time window that holds events that arrive during windowTime periods, and gets updated for each windowTime.

3. timeLength window

timelength(int windowTime, int windowLength)

A sliding time window that, at a given time holds the last windowLength events that arrived during last windowTime period, and gets updated for every event arrival and expiry.

4. length window

length(int windowLength)

A sliding length window that holds the last windowLength events at a given time, and gets updated for each arrival and expiry.

5. lengthBatch window

lengthBatch(int windowLength)

A batch (tumbling) length window that holds a number of events specified as the windowLength. The window is updated each time a batch of events that equals the number specified as the windowLength arrives.

6. externalTime window

externalTime(timeStamp, int windowTime)

A sliding time window based on external time. It holds events that arrived during the last windowTime period from the external timestamp, and gets updated on every monotonically increasing timestamp. Here the timeStamp should be an attribute of the record which is used as the constraint type of relevant input stream. As the timeStamp parameter you should pass <streamName>.<attiributeName>.

7. externalTimeBatch window

externalTimeBatch(timeStamp, int windowTime, int? startTime, int? timeout)

A batch (tumbling) time window based on external time, that holds events arrived during windowTime periods, and gets updated for every windowTime. Here the timeStamp should be an attribute of the record which is used as the constraint type of relevant input stream. As the timeStamp parameter you should pass <streamName>.<attiributeName>. Parameters startTime and timeout are optional parameters. startTime can be used to specify a user defined time to start the first batch. timeout is time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch. Usually timeout is greater than windowTime.

8. uniqueLength window

uniqueLength(uniqueAttribute, int windowLength)

A sliding length window that returns unique events within the windowLength based on the given uniqueAttribute. Here the uniqueAttribute should be an attribute of the record which is used as the constraint type of relevant input stream.

9. delay window

delay(int delayTime)

A delay window holds events for a specific time period(delayTime) that is regarded as a delay period before processing them.

10. sort window

sort(int windowLength, attributeName, string order)

This window holds a batch of events that equal the number specified as the windowLength and sorts them in the given order of given attributeName. Here the attributeName should be an attribute of the record which is used as the constraint type of relevant input stream.

If you need to know how these windows are implemented, you can always look in Ballerina github repository.

Please use the following links to access more learning materials,

Thank you! 😃

--

--

Lahiru Jayasekera

Undergraduate at University of Moratuwa | Software Engineering intern at WSO2