Siddhi 5.1 reduces Database Query Latency up to 3 times

Niveathika Rajendran
siddhi-io
Published in
7 min readJul 29, 2019

Stream processing use cases have a need to process millions of events per second. However, not all data can be stored in memory or attached as stream attributes. Hence, the need for getting additional data from databases introduces to the use case of executing traditional database queries in the stream processing pipeline.

Siddhi allows working with stored data through its feature called Tables.

define table Purchase (purchaseId string, dateOfPurchase long, 
customerId string, country string,
totalAmount float);

Here, the tables can be in-memory or backed with traditional databases. Siddhi gives support to connect with traditional DBs such as MySQL, Postgres, MSSQL and Oracle, as well as NoSQL DBs such as MongoDB. However, the drawback of table query is the latency introduced to the processing pipeline especially when the tables are backed by traditional databases. Even though this cannot be avoided, from Siddhi Core 5.1.1 this has been tweaked to perform at the lowest possible latency along with efficient memory management.

This blog talks about the architecture behind the Siddhi table join, the changes introduced, and the performance evaluation.

Sample Usecase

Before delving into the architecture, let’s see the sample use case for the explanation. Consider that we have the following DB table in the system which records customer transaction, and we have a use case to query for the top 10 customers in terms of their total purchases originated from different countries for the month of July.

Purchase Table

This can be represented in Siddhi as follows,

@PrimaryKey(“purchaseId”)
define table Purchase (purchaseId string, dateOfPurchase long,
customerId string, country string,
totalAmount float);

By default, this will be an empty in-memory table. Here, with the addition of “@store” annotation, this table can be configured to mirror a traditional DB. For instance, the following definition mirrors a MySQL DB.

@Store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/stocks",
username="root", password="root",
jdbc.driver.name="com.mysql.jdbc.Driver")
@PrimaryKey("purchaseId")
define table Purchase (purchaseId string, dateOfPurchase long,
customerId string, country string,
totalAmount float);

As we have a use case to query for the top 10 customers in terms of their total purchases originated from different countries for the month of July, we use TriggerStream that is configured with an HTTP source via “@source” annotation to trigger the processing.

@source(type='http', receiver.url='http://localhost:5005/trigger',         
@map(type = 'json'))
define stream TriggerStream (startTimeStamp long, endTimestamp long,
country string);

The query used to get top 10 customers will be as follows,

from TriggerStream as T join Purchase as P
on P.country == T.country and
P.dateOfPurchase > T.startTimestamp and
P.dateOfPurchase < T.endTimestamp
select P.customerId as customer, sum(P.totalAmount) as totalAmount
group by P.customerId
order by totalAmount desc
limit 10
insert into OutputStream;

Here, the on condition on line 2 represents the filters, i.e country in the table should match the country value sent through HTTP request and dateOfPurchase must be between startTimestamp and endTimestamp sent through each HTTP request. The clauses select, group by, order by and limit are similar to DB SQL queries.

The final Siddhi app will be as follows,

Table Join Architecture

The internal architecture for Table joins queries as seen below,

Here, Table Processor is the interface used to run the find query against the table. All Siddhi tables have to be implemented using AbstractRecordTable which has the function find() through which the on condition (where clause) can be passed to underlying DB.

The DB SQL Query for the above use case will be as follows, this is compiled once during the runtime and reused upon each event arrival.

Then, Query Selector processes the queried events by performing group by, having, limit and offset as well as final stream attributes transformations in-memory.

Recently, a new table interface, AbstractQueryableRecordTable was introduced to Siddhi. This interface gives support to pass the selectors to the DB for processing. In essence, using its query() function we can pass the work of Query Selector to the DB itself and skip the Query Selector processing such as performing group by, having, limit, offset and attributes transformations in-memory. This optimized processing has been achieved through the following architecture.

The corresponding DB SQL Query will be,

With the new implementation, the Table Processor takes a decision to use the optimized flow over the normal flow when the following two conditions are satisfied,

  1. The underlying table(store) implements the interface AbstractQueryableRecordTable.
  2. The table(store) supports all the select functions used in the query.

To understand the function support, let’s say in the query we have an ifThenElse function to rename the customer id to Unknown if it is equal to 0. The Siddhi select query having such ifThenElse function will be as follows,

select 
ifThenElse(P.customerId=='0', 'Unknown', P.customerId) as customer,
sum(P.totalAmount) as totalAmount

Here, when the selection is compiled (via compileSelection() method), the underlying implementation of the table(store) can throw an exception to indicate that the specific function cannot be executed natively in the DB violating the 2nd condition to use the optimization.

In such situations, the Table Processor will revert to the original flow to maintain backward compatibility. However, when both the conditions are satisfied, Siddhi passes the Query Selector’s work on to the physical database resulting in not loading a lot of data to the memory and reduces memory consumption. In the meantime performing group by and other operations on DB level enables the system to use the DB’s query optimizations and indexes and thus reduces the overall latency.

Performance Evaluation

The performance was evaluated by running the above discussed Siddhi query on a table with 100,000 records with a varying number of the unique keys combinations present in the records. I.e the Purchase table was tested with 100000 records, among which it has 10000 unique customer entries, then with 25000 unique customer entries and so on. The tests for a specific number of unique combinations was repeated 10 times to get average to reduce anomalies.

The tests were carried out in, 4 CPU, 16 Gb machine against MySQL 5.7.26

The latency measurement can be seen below,

When group by is 1, the latency reduced from 227ms to 90 ms which is 2.5 times faster than before. The latency for 100,000 combinations reduced from 380 ms to 128 ms, giving a 66% latency improvement i.e time taken has reduced by almost 3 times. As seen above the latency for queries when group-bys increase was much steeper before the optimization, and after Siddhi Core 5.1.1 the latency hovered around 90–130 ms giving a 60–66 % improvement (2.5 to 3 times faster performance compared to previous versions). Further, the latency was reduced from the get-go and it is far more prominent when the DB contains a large number of unique customers i.e when group by has a large number of unique combinations.

Things to note

The optimization will produce an advantage when it can cut down a large amount of data that is being retrieved to perform the group by operation on the memory. Even though this behavior is executed by default, we still have to be careful when writing queries to ensure that the query can be passed to the underlying DB. For instance, for the query where we need to select top K users while renaming customer Ids having 0 to Unknown. Instead of writing the query as bellow,

@info(name = ‘get_top_10_users’) 
from TriggerStream as T join Purchase as P
on P.country == T.country and P.dateOfPurchase > T.startTimestamp
and P.dateOfPurchase < T.endTimestamp
select
ifThenElse(P.customerId==’0', ‘Unknown’, P.customerId) as customer,
sum(P.totalAmount) as totalAmount
group by P.customerId
order by totalAmount desc
limit 10
insert into OutputStream;

Where the optimization may fail because Siddhi’s ifThenElse function in select clause may not be supported by RDBMS to be processed by the DB level, and thus the system may revert to its standard behavior.

However, by rewriting the query to run the selection after results are taken from the DB as below, we will be able to get the advantage of the optimization.

@info(name = ‘get_top_10_users’) 
from TriggerStream as T join Purchase as P
on P.country == T.country and
P.dateOfPurchase > T.startTimestamp and
P.dateOfPurchase < T.endTimestamp
select P.customerId as customer, sum(P.totalAmount) as totalAmount
group by P.customerId
order by totalAmount desc
limit 10
insert into TempStream;
@(info=’CustomerId_Validation’)
From TempStream
Select
ifThenElse(P.customerId==’0', ‘Unknown’, P.customerId) as customer,
totalAmount
Insert into OutputStream;

Conclusion

Siddhi Core 5.1.1 improves database query performance up to 3 times along with efficient query runtime memory usage. To utilize this performance gain underlying Siddhi store must be implemented with AbstractQueryableRecordTable and support all selection functions used in the query. If not, the Siddhi join query will revert to its standard(previous) behavior.

With this improvement, Siddhi stream processing will have lower latency and improved the overall throughput when having DB interactions.

--

--