In-memory Database Caching for Stream Processing — Siddhi
Integrated systems need to interact with databases as they depend on those for data storage, share information with other systems, and preserve system state over restarts and failures. One of the significant drawbacks of traditional databases is their higher read-write latencies compared to in-memory data stores, as the databases use hard disks for storing data. Though, such high latencies are generally acceptable for traditional transaction processing when they are used for stream processing use cases, where it is required to read and write millions of events pre-second, the additional latency introduced by the databases can reduce the overall throughput. In this blog, we will explore how in-memory data caching for such databases can help to improve the performance of a stream processing system, Siddhi.
In the following sections, we will discuss a sample use case for using database caching, the architecture of the Siddhi caching implementation, the configuration of Siddhi cache, simulations for performance evaluation, the results, constraints in using Siddhi database caching, and the conclusion.
1. Example Scenario
WSO2 API Manager uses Siddhi for analytics. The Siddhi queries for APIM Analytics uses many different database tables. Here, we will look at a simple Siddhi query that interacts with a table named ApimIPAccessSummary
to fetch API access information upon each API request. The Siddhi query for this is as follows;
@info(name = 'Query1')
from RequestStream left outer join ApimIPAccessSummary
on RequestStream.key == ApimIPAccessSummary.applicationConsumerKey
and RequestStream.ip == ApimIPAccessSummary.ip
insert into RequestWithAccessInfoStream;
Here, every time when an event arrives over RequestStream
its key
and ip
attributes are compared against the table attributes applicationConsumerKey
and ip
respectively. If a matching entry is present for the key
and ip
, the corresponding event is then inserted into RequestWithAccessInfoStream
including all the attributes from table and stream. For the purpose of this blog, this scenario can be thought as, if someone sends a request to an API, we check if that person has requested that API before and if so we send those details with the request into RequestWithAccessInfoStream
.
This scenario will be simulated for the purpose of determining the performance of using database caching. Here, the ApimIPAccessSummary
table has four columns; username
, applicationConsumerKey
, ip
, and lastAccessDate
, and out of these applicationConsumerKey
, and ip
are defined as primary keys. Further, in this experiment, the table is loaded with up to 10000 rows and it is queried every time when an event arrives in RequestStream
. Therefore, if the stream has a rate of 1000 events per second, the table is also queried 1000 times per second.
As database access introduces latency, this, in turn, reduces the overall throughput of the system. The following sections discuss how we can improve this by using Siddhi database caching.
2. Database Caching Architecture and Implementation in Siddhi
(If you are only interested in using Siddhi cache, you can skip this section and refer the documentation, but if you are a Siddhi developer we encourage you to read this section.)
Siddhi core in Siddhi repo supports many extensions such as “siddhi-store-rdbms” to support different kinds of databases. As shown in the class diagram above, the database store extensions extending the AbstractQueryableRecoredTable
class can benefit the use of cache without any modifications.
Since Siddhi has an InmemoryTable
which supports all type of queries while storing data in in-memory (RAM), it is extended to write the CacheTable
. This CacheTable
is an abstract class and from Siddhi v5.1.0 onwards it supports three different cache policies as following.
- FIFO — First-In, First Out
- LRU — Least Recently Used
- LFU — Least Frequently Used
All the policies maintain data according to the maximum cache size set by the user. Suppose cache max size is defined as 10 by the user, then 10 rows will be added to cache table and when the 11th event arrives in the stream, in the FIFO cache is used, the row that was added first will be removed to accommodate the 11th event. Similarly, LRU removes “Least Recently Used” and LFU removes “Least Frequently Used” to accommodate the new event.
Cache Expiry
Database tables are not local to a single SiddhiApp, and it can be shared among many SiddhiApps, and even with different processing engines and software systems. In these cases, the database table can be edited by any of the sharing parties. However, the Siddhi cache is only local to a single SiddhiApp, and therefore, it is important to ensure that the data is synced between the cache table and the corresponding database table to ensure correct behavior of the system.
Let’s suppose a cache table is declared with LFU policy, and one particular row is being queried again and again (very frequently used). So by the policy, it is least likely to be removed from the cache and will remain there for long. However, the same row can be updated with new values on the database by a different application. Then the cache table entry and store table entry can become out of sync.
The cache expiry is handled by the CacheExpier as shown bellow.
3. Configuring Cache In Siddhi
Consider the following table declaration in Siddhi;
@Store(type="rdbms",
jdbc.url="jdbc:mysql://0.0.0.0:3307/production?useSSL=false",
username="root",
password="root",
jdbc.driver.name="com.mysql.jdbc.Driver",
@Cache(size="1000", cache.policy="LFU")
)
@PrimaryKey("applicationConsumerKey", "ip")
define table ApimIPAccessSummary (username string,
applicationConsumerKey string,
ip string,
lastAccessedDate long
);
A cache is declared by using @cache
annotation inside @store
annotation in a nested fashion as shown above. Here the maximum size of the cache is defined as 1000
and the cache policy is set to LFU. If the user does not specify a policy, the policy is set to FIFO by default.
The cache behavior is subject to the relative size of the store table and cache max size at runtime. If the cache table is big and can accommodate the entire store table, then the entire store table will be loaded to the cache and the system will not use any of the above cache policies even if they are defined.
Configuring Cache Expiry
Cache expiry is an optional feature that can be enabled by a user. Consider the following @cache
nested annotation;
@store(....
@cache(size="1000",
cache.policy="LFU",
retention.period="5 min",
purge.interval="30 sec"
)
)
Cache expiry will be enabled by specifying a retention period in the @cache
annotation. Here the retention period is given as 5 min
and purge interval is given as 30 sec
. Therefore, any row added before 5 minutes will be considered expired and removed from the cache.
Cache expiry is also subject to the relative size of the store table and cache max size at runtime. If the cache table is big enough to accommodate the entire store table, then for every 5 minutes the entire cache table is deleted and reloaded from the store table. If the cache table max size is smaller than store table size, then for every 30 seconds an async thread will delete the cache entries which are older than 5 minutes.
If the purge interval is not specified, by default, it will be set equal to the retention period.
4. Simulations for Performance Evaluation
The store table ApimIPAccessSummary
is created with 10000 entries and the following tests have been run multiple times with different cache sizes to evaluate the performance. All the tests mentioned in this blog run on the MacBook Pro with 2.6 GHz Intel Core i7 running macOS Mojave 10.14.3 and used MySQL 5.6 as the database.
For each run, the cache size and the number of times to be queried are changed, and the table is queried with and without using the cache.
For the simulation-1, the queried parameters (key
and ip
) are picked randomly. However, in practical scenarios, the rows that are queried recently are more likely to be queried again. This is where cache shines and this is also the fundamental assumption for hardware caches. In order to simulate this condition, instead of randomly querying a row, in the simulation-2, we used biased random, where for each iteration a 50% chance was given the previously queried row and the other 50% chance was given for a new row. The sample test code is as follows.
5. Results
The tests are carried out for both simulation-1, and simulation-2 and the results are obtained as bellow.
Note that all the tests are repeated five times and the average time-taken is reported here to minimize the effect due to runtime differences.
5.1 Simulation-1
The following table shows the time taken for each test in milliseconds. The values in the last column are the average latency in Milliseconds from 10000 queries (events).
Chart 1 shows the comparison of time taken between querying all rows from a store table with and without using cache when the number of rows in the store is 3 and the size cache is 10. Here, as the total number of rows in the store table is less than the max cache size, entire store table is cached. Therefore, all types of queries run on the cache. The gap in performance is essentially the I/O latency comparison between a store table and in-memory table in Siddhi. I.e. hard disk access vs RAM access.
Chart 2, presents the time taken for different cache sizes when querying a random row out of 10000 rows from the store table. As you can see, the performance improves with the cache size. This is because when the cache size is greater, the chance of randomly queried row being present in the cache is greater. In addition, you may also note that when the cache size is set to 10000 there is a dramatic improvement in performance. This is because the total number of rows in the store table is 10000 and when the cache size is set to the same the whole table is loaded into the cache and whereby all queries run from the cache. Essentially this is the same scenario as in Chart 1.
5.1 Simulation-2
Chart 3 shows the results for simulation-2 where when the querying approach is changed to biased random with FIFO and LFU cache policies. In real-world scenarios, the database rows that were queried recently are more likely to be queried again rather than unqueried rows. In order to simulate this, we came up with a biased number generator as explained in section 4. You may notice that there is a performance increase when the FIFO cache size increases. You may also notice that when the cache policy is changed from FIFO to LFU there is little difference in performance. This is shown by the almost flat line at the end in chart 3. This is because we are providing a flat 50% change to one of the previously called events, which may or may not be in the cache.
To validate the behavior of LRU, we divide the array storing previously queried rows into 3 sections, and the partitions are set up in such a way that the most recently queried rows are more likely to be queried again. Please refer to the following code used to generate row to be queried.
Since the probability of being queried again decreases over time with this algorithm, the LRU performs better than FIFO as shown below.
In order to illustrate the use-case for LFU caching, the biased random indices were generated in such a way that the indices that were used most have a higher chance of being used again. This matches the practical scenario where users who access an API most are more likely to access it again. The following code segment is used to generated biased random indices.
The test produced the following results when biased random indices are used.
Here, as expected LFU Cache performs the best out of all other cache options since it is most likely to keep the rows that were used the most in memory.
6. Constraints in Database Caching
Database caching is a difficult problem to solve, and it is impossible to use cache for all use-cases. The database caches are quite different from hardware caches that are used in computers to reduce RAM access, this is because database queries are more diverse can they can query for multiple results. for example, if we take a range queries like select * from Table on key > 50
where the system needs to return all the database entries for which the key is greater than 50. This query cannot run on a database cache as it is hard to determine that all the data that we are looking for is already in the cache. Let’s say the database has 100 entries for which the key is greater than 50 but only 5 of them are available in the cache. If we run the query on cache we will send out only 5 entries. In such range queries, we cannot establish a cache hit or cache miss with certainty.
Therefore in Siddhi, we can only consider queries that use ==
comparator (with and
) to execute on the cache. But even for these queries, some are dificult to determine where we need to check the cache or the store table. Consider the query, select * from Table on key == 50
and let’s say the database table has 10 entries for which the key is 50, and out of them only 5 of them are cached. Hence if the system running the query from the cache will produce wrong results. Thefore, the query should be using all its primary keys with ==
operator on the on
clause, for that query to be eligible to run on the cache.
To summarize, a query should satisfy the following conditions to be suitable to run on a cache;
- Uses only
==
comparator or have multiple==
comparators combined byand
. - All the primary keys of the table should be used to query the data.
It can be verified that the Query1
in the previous section satisfying both the conditions given above.
However, these conditions can be relaxed when all the rows of the database table are cached. Let’s say the database table has only 100 rows and all 100 rows are available in the cache. Then we can confidently execute range queries and queries that don’t use primary keys from the cache. At runtime, based on the relative sizes of the cache table and store table Siddhi dynamically changes its query behavior of using its cache. Therefore based on the use cases users can configure the cache size to optimize its performance. The details on configuring cache in Siddhi is discussed in detail in Section 3.
7. Conclusion
Siddhi supports FIFO, LFU, and LRU cache policies to optimize data access from database store tables. It also supports cache expiry to periodically invalidate the cache and resync the data from the store table to be consistent with other systems who are updating the same database table. Siddhi also has a dynamic cache usage behavior where when the whole table is cached, it will use cache for all types of queries, and if the cache is smaller than the store table, it only uses cache when ==
based and
conditions including all the table primary keys are using in the on
condition of the query. Therefore, to leverage the power of cache, make sure the above conditions are met. Further, for on nonincreasing tables, if there is enough space to load all the data in-memory, it is recommended to set the cache size equal to or slightly greater than store table size to load all data to in-memory and to achieve optimal performance.
We believe this improvement will be highly useful for users to greatly improve the performance of their systems. Please share your feedback with the team via Twitter or dev-list.