Extending Yahoo streaming benchmarks to WSO2 streaming processor

Kavinga Upul Ekanayaka
ACCELR Blog
Published in
8 min readDec 23, 2020

--

Introduction

Bigdata is huge volume of data which is exponentially growing with time having a lot of value in information technology oriented world. Processing of Bigdata is not trivial and cannot be done using traditional data processing tools efficiently due to it’s large volume and complexity. Hence the importance of having new tools for modeling and processing Bigdata has gained a lot of attention in the tech world in the recent past. There are lot of applications producing Bigdata in various fields and hence require such tools with high demand. Financial technology, Real-estate, Gaming, E-commerce/marketing, healthcare, sports analytics, airlines, social media, advertising campaigns and many other IoT based industries can be given as examples.

Mainly these Bigdata based applications are two fold based on the nature of the data source and processing. Some of the applications need to store data and process them in batches later on whereas some of the data are continuously arriving and needs real time processing with low latency in order to take important decisions. Latter is called streaming data applications. Apache Hadoop and Apache spark are very popular among batch processing whereas streaming processing has Apache storm, Apache Flink, Apache spark, Apache samza, Apache beam as the most popular and we find WSO2 streaming processor also as a good competitor among open source streaming data processors.

Motivation

We at ACCELR are mostly interested in accelerating Bigdata applications using CPU, GPU and FPGA based acceleration methods in order to provide enhanced performance for the available popular Bigdata processing platforms by addressing various bottlenecks they have. Since streaming data processing mainly lies within our interested fields of applications, We wanted to find out a performance comparison among the aforementioned streaming data processors in order to select the best fitting tool for our requirement. In the literature, we found that a team in Yahoo has done a nice benchmark among streaming processors such as Apache storm[1], Apache flink[2] and Apache spark[3] which depicts a real world scenario. We wanted to extend this benchmark to other competitive tools like WSO2 stream processor and also upgrade storm and spark to their latest so that we can get an apple to apple comparison among the latest versions.

Yahoo streaming benchmark [4]

Benchmark design block diagram as shown in Yahoo developers blog by authors of the original benchmark design[4]

Yahoo streaming benchmark simulates an advertisement analytic pipeline. There are 100 advertising campaigns and 10 advertisements per campaign in this pipeline. The Benchamark application reads various advertising events in JSON format which are being produced by Kafka (synethitc/test data generation and injection via kafaka). Then it identifies the relevant events and filter out others. Finally it stores a windowed count (10 second window) of relevant events per campaign into a Redis[5] database. It has the following flow of operations.

  • Read an event from Kafka.
  • Deserialize the JSON string.
  • Filter out irrelevant events (based on event_type field)
  • Take a projection of the relevant fields (ad_id and event_time)
  • Join each event by ad_id with its associated campaign_id using pre-stored information in Redis.
  • Take a windowed count of events per campaign and store each window in Redis along with a timestamp of the time the window was last updated in Redis.

It uses a Clojure[6] script to produce data where producer creates each event with a timestamp marking the starting of the event. Benchmark application truncates these start times to 10 seconds (window length) to get the window starting times. When each window is stored in Redis per campaign ID, application adds another timestamp to depict the window last updated time. At the end a clojure script utility reads all windows with their staring time and last updated at times. Then it considers the difference between a window’s start time and its last_updated_at time minus its duration as the time it took for the final tuple in a window to go from Kafka to Redis through the application.

window.final_event_latency = (window.last_updated_at - window.timestamp)-window.duration

We are not going to discuss about the Spark and Storm implementations here as they have been already explained in the reference.

Extending benchmark to WSO2 SP

The good thing about extending Yahoo streaming benchmark application to WSO2 stream processor is that it also has a great support for Kafka with JSON format as an input interface as well as Redis as a database withing it’s extension collection. WSO2 stream processor is powered by their remarkable Siddhi complex event processor engine and they have a separate SQL like language called SiddhiQL to implement streaming applications. First we need to define all the input, output and intermediate streams that we are going to use in the application.

Following input data stream is defined to receive the input ad events data sent by Kafka using the topic ‘ad-events’.

@source(type=’kafka’,
topic.list=’ad-events’,
partition.no.list=’0',
threading.option=’single.thread’,
group.id=”group”,
bootstrap.servers=’localhost:9092',
@map(type=’json’))
define stream KafkaDataStream (user_id string, page_id string, ad_id string, ad_type string, event_type string, event_time string, ip_address string);

Then an intermediate stream is defined to hold the filtered data from the first filter operation.

define stream filteredOnViewStream (ad_id string, event_time long);

Next, we need to define a table for pre-storing ad_id to campaign_id mapping data where ad_id is the key and campaign_id is the value in each entry.

@Store(type=’redis’, table.name=’campaignIdTable’, host= ‘localhost’, port=’6379')
@primaryKey(‘ad_id’)
@index(‘campaign_id’)
define table campaignIdTable (ad_id string, campaign_id string);

And another two intermediate streams to hold joined data, joined key stream (explained below) respectively.

define stream redisJoinedStream (campaign_id string, ad_id string, window_time long);
define stream campaignTimeStream (campaign_window_time string, ad_id string);

And one more stream to hold window processed data.

define stream redisStoreStream (campaign_window_time string, seen_count long);

Finally a table to store output data.

@Store(type=’redis’, table.name=’redisStoreTable’, host= ‘localhost’, port=’6379')
@primaryKey(‘campaign_window_time’)
define table redisStoreTable (campaign_window_time string, seen_count long, last_updated long);

Since WSO2 SP’s Redis extension can read only tables written to Redis using the same extension, We had to define an additional input stream to receive pre-storing data by Kafka. For this we defined an additional Kafka topic and sent to this particular stream whereas in Storm and Spark, pre-storing of data is handled by the Clojure script itself.

@source(type=’kafka’,
topic.list=’ad-to-campaign’,
partition.no.list=’0',
threading.option=’single.thread’,
group.id=”group”,
bootstrap.servers=’localhost:9092',
@map(type=’json’))
define stream adToCampaignData (ad_id string, campaign_id string);

For this purpose, we implemented the first query which act in the pre-processing time to store ad_id to campaign_id data.

@info(name=’query_store’)
from adToCampaignData
insert into campaignIdTable;

As the first main query, we implemented the filtering the input raw data based on event_type where only “view” event type is selected.

@info(name=’query1')
from KafkaDataStream[event_type == “view”]
select ad_id, convert(event_time, ‘long’) as event_time
insert into filteredOnViewStream;

Then the next query is implemented to join the pre-stored campaign_id data based on matching ad_id in the filtered stream. It does the truncation of event time to 10 seconds window in order to obtain window start time as well.

@info(name=’query2')
from filteredOnViewStream join campaignIdTable on filteredOnViewStream.ad_id==campaignIdTable.ad_id
select campaignIdTable.campaign_id as campaign_id, filteredOnViewStream.ad_id as ad_id, (filteredOnViewStream.event_time/10000)*10000 as window_time
insert into redisJoinedStream;

Since multiple (joined) primary keys setting is not working in WSO2 SP’s Redis extension, we had to implement an additional query to concatenate campaign_id and window_time as campaign_window_time in order to mock a joined key.

@info(name=’query3')
from redisJoinedStream
select str:concat(campaign_id, “:”, convert(window_time, ‘string’)) as campaign_window_time, ad_id
insert into campaignTimeStream;

Then comes the next main query to count the events based on campaign_window_time as the key within 10 second windows. In this query, we get an output event for each input event and in each output we can see the accumulated seen count value for a particular campaign_window_time. Therefore we do not have to worry about a separate accumulation logic as opposed to storm and spark implementations.

@info(name=’query4')
from campaignTimeStream#window.time(10 sec)
select campaign_window_time , count() as seen_count
group by campaign_window_time
insert into redisStoreStream;

Final query updates or inserts in to the Redis output table based on campaign_window_time as the key and it adds the timestamp which could be considered as last updated time of the particular window for a given key. Since accumulation of seen_count already happened in the previous query, it will have the final accumulated seen_count for each key in each window at the end with the update operation.

@info(name=’query5')
from redisStoreStream
select campaign_window_time, seen_count, eventTimestamp() as last_updated
update or insert into redisStoreTable
on redisStoreTable.campaign_window_time==campaign_window_time;

Performance comparison at a glance

We used Amazon AWS c5.4x large instance which has the following configuration.

Processor : Intel Xeon Platinum 8000 series processor (Skylake-SP or Cascade Lake) with a sustained all core Turbo CPU clock speed of up to 3.6 GHz

vCPUs : 16

Memory : 32 GiB

Since we just wanted to get a rough performance estimation and comparison among the platforms, we used the basic default configurations in the Yahoo benchmnarks and did not try to stretch any tool to its maximum capacity. It includes a single node of Kafka with one partition and single node of Redis, single worker node for both spark and storm but 4 cores and 2 ackers for storm. WSO2 SP also had a single node.

The load (events/second) was increased from 1000 up to 10000 in intervals of 1000 and then up to 25000 in intervals of 5000. We used lower throughput levels since all tools had their basic configurations and otherwise input load will start to fall behind by a large margin. Each test was carried out for 5 mins of streaming data producing time. Finally calculated the 99th percentile latency of obtained results.

Following tool versions were used.

WSO2 streaming integrator : 1.1.0 (latest version of WSO2 streaming processor)

Apache Kafka : 2.3.0

Redis : 6.0.9

Scala bin version : 2.12 (sub version 12)

Apache Storm : 2.2.0

Apache Spark : 3.0.1

Time after window closed (Latency) vs Throughput (events/second) comparison

All 3 platforms are performing more or less similar with their basic configurations while WSO2 streaming processor is performing much better when throughput load is less than 20000 events per second.

Conclusion

We mainly wanted to extend the existing benchmark to WSO2 streaming processor and compare the latency and performance. It seems like its doing much better in lower throughput loads. As future work, we need to increase the worker nodes in all platforms in a cluster configuration letting them to reach their maximum capacity in order to get a better understanding about how these platforms will perform with much higher throughput load in place. Apache spark seems to be much stable throughout all input loads with lower latency value. It shoule be noted that this is the first time authors developed an application using WSO2 SiddhiQL and there may be better ways to improve the implementation.

References

[1]. https://storm.apache.org/
[2]. https://flink.apache.org/
[3]. https://spark.apache.org/
[4]. https://developer.yahoo.com/blogs/135370591481/
[5]. https://redis.io/
[6]. https://clojurescript.org/

--

--

Kavinga Upul Ekanayaka
ACCELR Blog

Tech Lead at ACCELR, Consultant Software Engineer at Bigstream, Former Tech Lead at Wave Computing, B.Sc. Engineering (Hons) from University of Moratuwa