DEBS Grand Challenge 2016: Continuous analytics on graph data streams using WSO2 complex event processor
ACM Distributed and Event-based Systems (DEBS) Grand Challenge is a yearly competition where the participants develop an event-based solution to solve a real world high-volume streaming problem.
This year’s challenge focuses on analysing the properties of a time evolving social-network. The data for this year’s event has been generated using Linked Data Benchmark Council (LDBC) social network data generator.
The ranking of the solutions is carried out by measuring their performance using two performance metrics: (1) throughput and (2) average latency.
WSO2’s been submitting solutions to the grand challenge since 2013, and our previous grand challenge solutions have been ranked as one of the top solutions among the submissions. This year, too, we submitted a solution using WSO2 CEP/Siddhi. Based on its performance, this year’s solution has also been selected as of the top solutions and we were invited to submit a full paper to the DEBS 2016 conference. This paper was presented in the DEBS conference on 22 June 2016 .
In this blog I’ll present some details of our solution which is based on WSO2 CEP.
This year’s challenge involves analysing a dynamic (evolving) social-network graph. Specifically, the 2016 Grand Challenge targets following problems: (1) identification of the posts that currently trigger the most activity in the social network, and (2) identification of large communities that are currently involved in a topic. A brief description of the two queries is given below (more details about the two queries can be found here.)
Query 1 :Find top 3 posts
- Streams: posts and comments
- The goal of query 1 is to compute the top-3 scoring active posts producing an updated result every time they change.
- The total score of an active post P = sum of its own score + score of all its related comments.
Query 2: Find largest k communities
- Streams: Friendships, Comments and Likes
The goal is to find k comments that were created not more than d seconds ago which have the largest range where the range of a comment is defined as the size of the largest connected component in the graph defined by persons who have liked that comment and know each other.
The architecture of the solution
The following figures illustrate the architecture of the two queries. Note that in our implementation, we execute the two queries in parallel (the aim is to utilize all the machine cores)
Let me now provide some details about the architecture of the two queries. For each query there is a dedicated data-loader thread (T1, T2, T3, T4, T5) for reading each data stream. The data (i.e. tuples) read by a data loader is placed in a blocking queue. The blocking queue provides a place for the data loader to store events without having to wait for the event-ordering threads to consume them. The loader threads also keep the event-ordering threads busy by providing a backlog of events to consume (note: the maximum capacity for a blocking queue can be specified when the queue is constructed. This parameter can be used to address the excessive queue expansion problem which may occur when the production rate is higher than the consumption rate.).
There are two dedicated event-ordering threads for ordering query 1 (T6) and query 2 (T7) streams. These threads fetch the events from queues and order them based on their event time-stamps. Note that events in queues are already ordered based on their time-stamp (the time-stamp here refers to logical time). The main functionality of the event-ordering thread is to order the steam which is placed in the ring buffer. For example, in the case of query 1, it orders comment and post streams and place these invent in the ring butter. The event processing threads (Q1 and Q2) fetch the events from the ring buffer and process them according to the query logic. Note that event-ordering threads act as producer threads while the processing threads represent consumers.
Query 1: Data Structure
As illustrated below the query 1 data structure consists of three maps (post-map,post-score map and comment-post map) and ten time windows.
- Post-map (hashMap): mapping between post_id and post
- Post-score map (bounded sorted multi-map of size 3): mapping between post score and post_id
- Comment-post map (hashMap): mapping between comment_id and post_id
- 10 time windows : These time windows allow us to model the expiration of posts and comments (note: the initial score of a post/comment is 10 . This score gets decremented by 1 every 24 hours). The first time-window stores all the posts and comments whose ages are less than or equal to 24 hours. Similarly, the second window contains posts and comments whose ages are greater than 24 hours and less than or equal to 48 hours and so on.
Query 1: Algorithm
When a new post arrives, it is registered in the post and post-score maps. (note: the initial score of a post is equal to 10). Then the time-windows are processed. Each time-window contains many time-window objects where each object stores information regarding a comment or post (with a specific score). When a post arrives the time stamp of the new post (event) is evaluated against the timestamp of time-window objects. If the age of the time-window-object > upper limit of the time-window (i.e. 24,48, etc), the time-window object is transferred to the next time window and so on. The movement of a time-window object from one time window to another window indicates a change in a particular post score. When we detect such a change, we update the post-score map. After processing the time windows, the new post is given a score of ten and it is placed in the first time-window. The post-score map now contains the top three scores and their corresponding posts. The final top three posts are obtained by processing this map.
When a new comment arrives, if the comment is for a post then it is registered in the comment-post map. If the comment is for a comment then post id of the comment is obtained from the comment-post map by doing a look up on the comment id. Then the new comment is registered in the comment-post map. As it was done for a post, the time windows are processed and the new comment is then placed in the first time window.
The main objective of the algorithm is to compute the top three posts by (only) processing a small percentage of active posts. Note that at any given point in time there are many active posts in the system.
Query 1: Software Architecture
The following figure shows the UML diagram of query 1. The UML diagrams in this blog have been generated ObjectAid UML Explorer (Eclipse plugin) .
org.wso2.siddhi.debs2016.extensions.Query.Run is the main entry point to the application and this where the two queries start.
As pointed out our solution is based on the WSO2 CEP and we have implemented it as an Siddhi extension. This specific extension is based on org.wso2.siddhi.debs2016.extensions.StreamFunctionProcessor. org.wso2.siddhi.debs2016.extensions.RankerQuery1 extends from this class.
The org.wso2.siddhi.debs2016.comment.PostStore contains the post-map and post-score map and this class has methods which allows the client code to access post information and register new posts.
The final top three posts are obtained by processing a hash-map which is based on a comparator (org.wso2.siddhi.debs2016.post.PostComparator)
org.wso2.siddhi.debs2016.DataLoaderThread1 is responsible for reading the event streams from the text file.
org.wso2.siddhi.debs2016.OrderedEventSenderThreadQ1 is responsible for the ordering events.
Query 2 : Data structure
The main components are
- Comment Map (hashMap): mapping between the comment_id and comment-like-graph (Comment-like-graph is adjacency list of users who have liked the comment . For each vertex i, it stores an array of the vertices adjacent to it)
- Largest connected component map (sorted MulitiMap): mapping between the size of largest connected component and the comment
- Friendship List: graph of friendships represented using an adjacency list
- Time window of length d
Query 2: Algorithm
When a new friendship event arrives, it is registered in the the friendship list. This new friendship can have an effect on largest k communities. For example, a new friendship may result in two disconnected components in a comment-like-graph (refer to the previous section for the definition) to merge forming a single component. When this happens we update the largest connected component map.
When a comment event arrives, it is added to the time window as well as to the comment map.
When a “like” event arrives it registered in the related comment’s comment- like-graph object. The relevant entry in the largest connected component map is then updated with the updated size of the largest connected component.
After processing each event, a check is done on the largest connected component map to see if the order of the top k entries have changed and if so, an output is generated.
Query 2: Software architecture
The following figure shows the UML diagram of query 1
Similar to query 1, the query 2 is also based on the org.wso2.siddhi.debs2016.extensions.StreamFunctionProcessor. org.wso2.siddhi.debs2016.extensions.RankerQuery2 extends from the StreamFunctionProcessor.
The org.wso2.siddhi.debs2016.comment.CommentStore contains comment map and (mapping between comments and comment-like-graphs) and the time window. The class has methods which provide access to comment information (get top-k comments) and methods which allows you to register a new comment, register a new like and register a new friendship and update comment time-window.
org.wso2.siddhi.debs2016.DataLoaderThread2 is responsible for reading the event streams from the text file.
org.wso2.siddhi.debs2016.OrderedEventSenderThreadQ2 is responsible for ordering the events.
The solution was evaluated using a four core/8GB virtual machine running Ubuntu Server 15.10. The two performance metrics used for evaluating the system are 1) the throughput and 2) the mean latency. Two data sets have been used to evaluate the performance. The following tables show the performance results. Both queries perform extremely well under both data sets. The performance query 2, however, is better compared to that of query 1. The best throughput was observed (330,000 events/sec) in query 2 when using the large data set. While the best average latency (0.38 ms) is observed in query 2 when using the small dataset.
In this Blog, I presented the details of our DEBS 2016 grand challenge solution which is based on WSO2 CEP. We have used numerous techniques to optimize the performance of the queries. Some of these techniques include executing the two queries in parallel, using (specific) data structures such as Bounded-Sorted Hash-maps, using primitives where possible, implementing queries in a pipe fashion, use of time-windows (which allows us to compute the final result by processing a small fraction of data in the system), tuning the maximum capacity of maps, optimally controlling the queue lengths to achieve better performance, use of efficient algorithm to compute the largest connected components and so on.
Our solution was able to handle massive amounts of streamed events (up to 330,000 events per second) and the latency of the solution was very low (best case average latency = 0.3ms )