How Ballerina Streaming Window Works???
I think all of you are aware of Ballerina language(If not why am I looking at the article? noh 😜).
Ballerina is a simple programming language whose syntax and platform address the hard problems of integration. It is a general purpose, concurrent, transactional, statically and strongly typed programming language with both textual and graphical syntaxes. Its specialization is integration — it brings fundamental concepts, ideas and tools of distributed system integration into the language and offers a type safe, concurrent environment to implement such applications. These include distributed transactions, reliable messaging, stream processing, workflows and container management platforms.
(source-https://ballerina.io)
See the words Stream Processing are highlighted. That’s because stream processing capabilities is not common in everyday general purpose programming languages. If you are new to this subject I suggest you have a look into this article, Story Behind Ballerina Streams (by Mohanadarshan Vivekanandalingam, contributor to Ballerina streaming).
As prerequisites I recommend you have a basic knowledge about Ballerina language(https://ballerina.io/learn/).
Beginning
First of all we’ll have a look into a simple Ballerina streaming query with a length
window. Think that you have a stream named stockStream
into which you get different stock events. Now if you want to calculate the total quantity of every last 5 stocks placed, you can use a length
window as follows.
from stockStream window length(5)
select stockStream.name, sum(stockStream.quantity) as totalQty
=> (StockTotal [] total) {
foreach var t in total {
StockTotalStream.publish(t);
}
}
I used the length
window here as it is simple and easy to understand. Following image sequence demonstrates how a length
window of length 2 works.
You can see that this approach is like a sliding window. That means it holds the last window length events at a given time, and gets updated for each arrival and expiry. Now we’ll have a look into how we implemented this into Ballerina streaming.
Length
window has only one parameter which is window length. You pass this parameter in the streaming query. What we do is we desugar(removing the syntactic sugar; for more info — Ballerina Compiler Design) that window length(5)
into Ballerina code, which initializes a LengthWindow
object and pass the incoming events to its process
function. Following is the implementation of that LengthWindow
object.
What happens here is when an event comes to process function, it is inserted into outputEvents
array. Then the same event is inserted into a global linked list after tagging it as an EXPIRED
event. Then outputEvents
array is passed to the next process. If the size of the linked list is equal to the user specified window length, then we remove the first event in linked list and add it to the outputEvents
array. This is tagged as EXPIRED
, hence it acts as the boundary of window.
Yes, that is it. That is simply how the length window works.
But there are a bunch of other inbuilt windows in Ballerina that you can use.
Lets have a look how each of these windows work,
1. time window
time(int windowTime)
A sliding time window that holds events that arrived during the last windowTime
period at a given time, and gets updated for each event arrival and expiry.
2. timeBatch window
timeBatch(int windowTime)
A batch (tumbling) time window that holds events that arrive during windowTime
periods, and gets updated for each windowTime
.
3. timeLength window
timelength(int windowTime, int windowLength)
A sliding time window that, at a given time holds the last windowLength
events that arrived during last windowTime
period, and gets updated for every event arrival and expiry.
4. length window
length(int windowLength)
A sliding length window that holds the last windowLength
events at a given time, and gets updated for each arrival and expiry.
5. lengthBatch window
lengthBatch(int windowLength)
A batch (tumbling) length window that holds a number of events specified as the windowLength
. The window is updated each time a batch of events that equals the number specified as the windowLength
arrives.
6. externalTime window
externalTime(timeStamp, int windowTime)
A sliding time window based on external time. It holds events that arrived during the last windowTime
period from the external timestamp
, and gets updated on every monotonically increasing timestamp. Here the timeStamp
should be an attribute of the record which is used as the constraint type of relevant input stream. As the timeStamp
parameter you should pass <streamName>.<attiributeName>
.
7. externalTimeBatch window
externalTimeBatch(timeStamp, int windowTime, int? startTime, int? timeout)
A batch (tumbling) time window based on external time, that holds events arrived during windowTime
periods, and gets updated for every windowTime
. Here the timeStamp
should be an attribute of the record which is used as the constraint type of relevant input stream. As the timeStamp
parameter you should pass <streamName>.<attiributeName>
. Parameters startTime
and timeout
are optional parameters. startTime
can be used to specify a user defined time to start the first batch. timeout
is time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch. Usually timeout
is greater than windowTime
.
8. uniqueLength window
uniqueLength(uniqueAttribute, int windowLength)
A sliding length window that returns unique events within the windowLength
based on the given uniqueAttribute
. Here the uniqueAttribute
should be an attribute of the record which is used as the constraint type of relevant input stream.
9. delay window
delay(int delayTime)
A delay window holds events for a specific time period(delayTime
) that is regarded as a delay period before processing them.
10. sort window
sort(int windowLength, attributeName, string order)
This window holds a batch of events that equal the number specified as the windowLength
and sorts them in the given order
of given attributeName
. Here the attributeName
should be an attribute of the record which is used as the constraint type of relevant input stream.
If you need to know how these windows are implemented, you can always look in Ballerina github repository.
Please use the following links to access more learning materials,
Thank you! 😃