Lets write a window in Ballerina

Gimantha Bandara
Ballerina Swan Lake Tech Blog
9 min readMar 11, 2019

The objective of this article is to guide you on how to write a custom window in Ballerina for stream processing use-cases. If you are new to stream processing, you can have a high level understanding of it by reading this article. To further understand how windows work, you can refer to this article.

https://www.jishuwen.com/d/2vsX/zh-hk

In the context of event processing, the data is processed as a never ending stream of events/pieces of data. Sometimes, you might want to collect these events in small chunks and do some processing on them and produce valuable information/insights.

For example, lets consider a stock market. Stakeholders may be interested in opening price, closing price, highest price and the lowest price in a particular time period in a particular time frame (e.g every 1 minute,, 5 minute,.. etc) for a particular index/symbol. You have all the stock market price data coming into your system as a never ending stream of data. How would you design a window to collect stock events every 1 minute, every 5 minute ..etc and release them at the end of each time slot? And most importantly why can we use a timeBatch window here? TimeBatch will hold events for a given time period and release the events, but it is not in sync with the clock time. For example, if timeBatch window of window time of 2 minutes, starts to receive events at 14:44:34 PM, it will release events at 14:46:34 PM. This is what stock market stakeholders want. They need Events released at 14:46:00 PM, then at 14:48:00 PM .. so on. That is if we use a window time of 2 minutes. So how do we create a window to cater this use-case? We need a window that release its collected events based on a cron expression.

So the built-in windows will not fulfill our requirement, we are going to write a cron window. In the streams module, we have an abstract object called streams:Window. We need to implement the method of that abstract object in order to use our cron window in streams queries. Below code snippet shows the methods to be implemented.

Ballerina Abstract Window Object

process attached function does the real work in most of windows. Events which are received by the streaming query will be fed into the process function as an array of streams:StreamEvent objects. streams:StreamEvent is just a wrapper for each incoming piece of data. StreamEvent object has a field called data and it contains the actual data as a record of type anydata.

How do we process these streams:StreamEvent objects in this particular window? we have to hold incoming event objects till the cron expression triggers in and then release the events. Every object in Ballerina has a function called __init, which is invoked when we initialize the object with new operator. So we will use this function to initialize the window object with required fields (e.g. a data structure to hold the incoming events, a string to contain the cron expression,.. etc). Below is the sample code of the init method.

If you carefully look at the object’s fields, you will see there is a function pointer, a linked-list to hold the events, a scheduler to release the events and a string to hold the cron expression.

The function pointer

The function pointer is a reference to a function and in this specific case, our function pointer can hold a reference to a function of which the signature looks same as the process function of the streams:Window object. Why do we need a function pointer? That is to invoke the process method of the next processing object in the streaming pipeline of a streaming query. The next processing object can be a streams:Filter object or streams:Select or a streams:Aggregator.We will see how this function pointer is used in our cron window.

The scheduler object

Scheduler is a revamp of task:Timer object which is compliant with the Service architecture of Ballerina. You can attach a service with a resource function, to a scheduler and invoke that resource function according to the parameters(cron expression, no of occurrences, interval, delay, ...etc) given to initialize the scheduler. You can see in the above code snippet, the scheduler is initialized and attached with a service called cronService along with the cron window itself.

The streams:LinkedList is a collection in streams module. You can reuse it to hold any kind of objects. You can have a look at it here.

The initParameters(any[] parameters) function

You can see in the __init method of the cron window, there is a parameter which takes an array of parameters of type any. This array represents the window parameters we provide in a streaming query. For example, when we use externalTimeBatch(inputstream.timestamp, 1000) in a query, the attribute timestamp of the input stream is converted to a string the value 1000 and is converted to a two-element array and is passed to __init method. The initParameter function will process this array and extract the necessary values to initialize the window.

The process function

All the incoming events will be processed in this function. In this particular window, the incoming events will be stored in the eventQueue object.

Well, in this window, the process method doesn’t do anything other than filling the eventQueue object. But other windows contains somewhat complex logic. You can have a loot at them here.

Emit the collected events

Now the events are being collected into the eventQueue and we need to release the collected events periodically according tot he given cron expression. Thats where the cronService which is attached in the __init function, comes into play. Note the below line in __init method.

_ = self.appointment.attach(cronService, attachment = self);

We attach the cronService and also we pass the cron window object itself into the cronService. Whatever we pass as an attachment, will be available as an argument in the resource function of the service being attached.

A service cannot be defined inside an object. You have to write it outside of the object. The onTrigger is the function which is being invoked when the cron expression kicks in. Inside the function we invoke releaseEvents() function of the cron window. Lets have a look at this function.

Look at the 4th line in the above code snippet. I have used a special function called createResetStreamEvent in streams module. It will create a copy of an given streams:StreamEvent object and change its type to streams:RESET. In addition to the data field in StreamEvent, it also have two other fields namely timestamp and the eventType. You might wonder what this type (streams:RESET) is. There are three types of events used internally in Ballerina streams. They are, RESET, CURRENT and EXPIRED.

Whenever a piece of data (in the form of anydata record) is received by the inputStream, that piece of data is wrapped in a StreamEvent and its eventType is set to be streams:CURRENT.

A Reset Event is a StreamEvent object of which the event type is streams:RESET. This event is used to reset the current state of the Streams constructs. For example, Lets consider the TimeBatch window. TimeBatch is a window which collects events for a specific period of time and then release all those events. Lets assume, we calculate an average of a field called temperature in the collected events. The Average aggregator has to clear its state(i.e. The state of average aggregator is the sum of the field values being considered to be aggregated and the count of the events it received so far), so that the aggregator will calculate a new average for the second time slot. Basically a RESET event is used to reset the states in batch/tumbling windows.

An streams:EXPIRED event is as same as a streams:RESET event, except they are used in sliding windows. In TimeBatch Window, the events are collected and release all of them at once at the end of each time slot. In Time window, every event has its own release time. They will leave the window in the order they are received into the window. We always have to keep in mind that we are processing an never ending stream of data. Whenever an event leaves/enter the window, whatever the calculation we have been doing, has to change accordingly. If we calculate an average temperature of temperature field of input stream, in a time window of 2 seconds, we have to increase the sum of the temperature values and increase the count, whenever the window receives a new event and calculate the average. Whenever an event leaves the window we have to decrease the count and subtract the temperature value of the leaving event and calculate the average again. By looking at the event type(CURRENT or EXPIRED) of the event, aggregators decide whether the event is leaving the sliding window or entering the sliding window. You can have a look at how we have implemented aggregators using these 3 types of events here.

Lets get back to our cron window. So we already understand that our window is not a sliding window. It releases events in batches. So every time when we release events, we have to prepend a streams:RESET event tot he output events array to make sure that the state of the previous batch is cleared in pipeline of the streaming query.

In the line no 11, I have cleared the eventQueue, to make sure that in the second time the releaseEvents function is called, the older(events of the previous batch) events will not be resent/duplicated.

In line 15, I invoke the next process function in the streaming pipeline. In every window you have to invoke the next process function, otherwise the output events will not be available for further processing(for calculating aggregations, for filtering for selecting, for joining… etc). Now we have gone through the cron window and finally the window would look like below.

Note that I haven’t gone through the getCandidateEvents function. It is used in joining queries only. When we join a stream with another stream, what we actually join is the events in the current eventQueue. For each events pair (actually if you carefully look at the function you will see that there are two arguments in the function which are maps of type anydata. These two maps are the actual data in the two events being joined. Look at the line numbers 75 and 76. We check if the events data pair satisfy the condition given in the join clause, if so, we add them to an array and return it. I am not going deep into how this function works. Almost all the windows contain the same logic in this function.

Now we have written our window, but in order to use it in a streaming query, we have to create a function which returns an instance of cron window.

Lets put our cron window into a separate module called foo. Now lets test our window :-). This demo is based on the first example(Stock market prices) discussed in this article. Our program receives price events. The program generates opening price, closing price, max price and the min price for that particular time slot.

I have created some demo data to represent the incoming stock events. Basically there are three events (line no 25, 26, 27). In line no 29, we deploy the streams query. It will listen on stockStream for incoming events. I have use the cron window to release events every 5th second in clock time, then calculate max price, min price and get prices of first and last events in select clause. Then the those values are fed to the stockChartStream in line no 58 as a StockChartData object. The stockChartStream is subscribed to the function called printStockChartData in line no 31. Therefore whenever the stockChartStream receives an event, that event will be passed to printStockChartData and get printed in standard output. The output is pasted below.

Releasing events now..
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":124.56, "stockStream.volume":5000.0}
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":143.54, "stockStream.volume":3400.0}
output: {symbol:"APPLE", openPrice:124.56, minPrice:124.56, maxPrice:143.54, closePrice:143.54}
Releasing events now..
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":156.23, "stockStream.volume":6500.0}
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":143.54, "stockStream.volume":3400.0}
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":124.56, "stockStream.volume":5000.0}
output: {symbol:"APPLE", openPrice:156.23, minPrice:124.56, maxPrice:156.23, closePrice:124.56}
Releasing events now..
Releasing events now..
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":143.54, "stockStream.volume":3400.0}
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":156.23, "stockStream.volume":6500.0}
Adding to output: {"stockStream.symbol":"APPLE", "stockStream.price":156.23, "stockStream.volume":6500.0}
output: {symbol:"APPLE", openPrice:143.54, minPrice:143.54, maxPrice:156.23, closePrice:156.23}

Further improvements

The events are created in the same BVM where the streaming query is running. So the timestamps of the events are in sync with the clock time. What if the events are generated in a different BVM and our program is in a different BVM. How do we handle that situation? You might have to implement something similar to externalTimeBatch window.

I have not used any locking mechanism to read and write the events from/to EventQueue. In a higher TPS, there can be inconsistencies if we don’t lock on the eventQueue when we read from the queue and also write at the same time.

There are many aspect we can improve, but I will not address them in this article.

You might be wondering what these functions are in the select clause. We have max, min, foo:firstVal, foo:lastVal. Max and Min are used to find the maximum value and minimum value in their respective order. They are builtin aggregators in Ballerina streams. Ballerina streams do not have a way to find the first event and the last event in the window. So I have written two custom aggregators in the same module to get a field values of the first and the last events. In the next article, lets discuss about writing a custom aggregators like foo:firstVal and foo:lastVal.

--

--