Axon Tracking Event Processor Optimization

Fatih Altuntaş
Trendyol Tech
Published in
6 min readJun 17, 2022

Written by Fatih Altuntaş & Ömer Buğra Selvi

Axon is an open-source CQRS and Event-sourcing framework. As the Wallet team in Trendyol, we have been using it in production for two years. We have over ten million aggregates and over one billion events.

In event sourcing, events are kept in the event store. The Axon supports various types of event store implementations. We use the PostgreSQL database as the event store with JpaEventStorageEngine implementation.

As we use the CQRS architecture, we are supposed to pass events from the command side to the query side. To do this, we require to process events, and Axon provides event processor components to deal with the technical aspects of this requirement. There are two types of event processors; Subscribing and Tracking.

To read our events from event storage, we use Tracking Event Processor, which uses a token to keep track of events that have been processed. A token represents the position of an event in the event storage.

We will use TEP and Tracking Event Processor interchangeably in the rest of this story.

With segmentation, TEP can use multiple threads to process an event stream. The number of segments specifies how many threads can run in parallel. For instance, if your segmentation count is 6, the six threads consume your event stream concurrently.

After reading events via TEP, we publish to Kafka for the projection query model. Keeping the write and read model in sync is a very important concern for us. In case any latency occurs here, it may cause customer dissatisfaction and inconsistency.

Symptoms

We encountered a problem while load testing our system. In some cases, we saw that our write model starts to publish events very slowly and even noticed that it sometimes stops. We even saw query timeout logs during the load test.

Firstly, we suspected TEP, so we started investigating it. To reproduce the same problem, we created a local test environment. Then we generated around 10 million events and enabled hibernate statistics to monitor our database query. After the environment was set up, we created various tracking event processor tokens with different indexes and gaps to see their performance.

Problem and Investigation

After monitoring TEP’s query statistics, we realized that some queries return results very slowly, as shown below :

[demopublisher]-0] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token OR e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 29422ms, rows: 100[demopublisher]-2] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token OR e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 29439ms, rows: 100[demopublisher]-3] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token OR e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 29439ms, rows: 100[demopublisher]-1] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token OR e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 29440ms, rows: 100

The number of events we created in the local database is about 10m; as you can see, the queries run in ~30secs, which is unacceptable.

Our running TEP implementation is GapAwareTrackingToken. Unlike the other implementation, it holds a gap list to avoid missing any events. For more detail, you can read the Java doc.

We configured TEP for testing with the following settings

  • segmentation-count is four
  • the current token index is 10101235
  • there are five gaps

Then we decided to dive deep into Axon’s JpaEventStorageEngine class.

Following Two different DB queries are run in this method. The second query is executed when there is at least one gap in the token. Otherwise, the first one.

We got execution plans of the queries by running the following commands:

The first query seems okay according to the execution plan. However, the second query is problematic. After investigating its execution plan, we obtained the following outcomes:

  • The execution time is so high that it is unacceptable for us. It should be around 10ms.
  • Not only is the execution time so long, but also memory and disk usage are very high, as you can see in the buffer parameter. It uses around ten million buffers.
  • By I/O timing, It takes around ~21 seconds to fetch necessary data from the disk.
  • Also, the number of “Rows Removed by Filter” is too large. It means that DB iterates that number of rows and then applies a filter. It looks like that iterating all rows and then filtering by a function. It sounds like not using the index in the table specified by the primary key column “global_index.”

Solution
We focused on the second query which is the slowest one. When we removed the “Order by “ clause in the query, we gained some improvement. Even though results are better than before, they are still not acceptable. Additionally, we can not remove that clause because we need to process events in order.

As a final solution, we decided to separate the problematic query into the following two simple queries:

Although running two different queries seems costlier, it is much better than running the slowest query.

Note: All metrics were taken from our local computer. It may change from machine to machine and many different things such as DB cache and configuration, but the average performance gain should be similar.

Implementation

After comparing the results we decided to separate the problematic query. We have extended JpaEventStorageEngine and overridden the fetchTrackedEvents method as follows:

And finally, to replace the existing bean, we defined our custom bean :

When we deployed and tested our solution, we obtained the following statistics:

[demopublisher]-2] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 16ms, rows: 5
[demopublisher]-2] HHH000117: HQL: SELECT ... FROMDomainEventEntry e WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC, time: 10ms, rows: 95
[demopublisher]-3] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 10ms, rows: 5
[demopublisher]-3] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC, time:10ms, rows: 95
[demopublisher]-1] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 16ms, rows: 5
[demopublisher]-1] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC, time: 10ms, rows: 95
[demopublisher]-0] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex IN (:gaps_0, :gaps_1, :gaps_2, :gaps_3, :gaps_4) ORDER BY e.globalIndex ASC, time: 4ms, rows: 5
[demopublisher]-0] HHH000117: HQL: SELECT ... FROM DomainEventEntry e WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC, time: 9ms, rows: 95

Conclusion

Overriding fetchTrackedMethod was not easy. We had to duplicate a few private fields and methods from the original implementation. We think that it is not appropriately implemented to be extensible. In Addition, JpaEventStorageEngine’s builder fields aren’t accessible so we couldn’t use them, so we duplicated some of the parameters.

As you can see in the implementation, we only changed the queries in the method, and the remaining parts are still the same. If JpaEventStorageEngine implementation had provided an abstract method responsible for getting events from the entity manager, we would have overridden it easily. We have a plan to open a pull request to Axon to make it easy to extend.

Update: We opened a pull request to the axon, and it has merged. It will be released with Axon 4.6.0

if you would like to check out the code, the repo is here

--

--