Deserializing Millions Of Messages Per Second Per Core

Step-by-step journey to the peak of performance

Sergei Shubin
Agoda Engineering & Design
11 min readJul 2, 2020

--

Have you ever wondered how fast you can deserialize binary messages? In the Agoda Data Platform we definitely have, given that we maintain the infrastructure to support over 300 micro-service clusters scattered across thousands virtual machines.

One of the platforms we have is an in-house time-series storage, which focuses on long term storage with tiered granularity and seamless cross-data center integration. The challenge — process 10 million measurements every single second, and today I want to share our path of high-performance deserialization.

Basics

What are we dealing with? Our measurement format will be defined as follows:

Example of measurement

Our goal is to deserialize this message as fast as possible. It’s important to remember that measurements are to be matched with each other by metric/tags/logtime, aggregated, stored at long term storage and queried, so this will affect how we process them.

These are the tools we are using:

  • The Scala language, due to our deep knowledge of the Java Virtual Machine (JVM) and rich vocabulary of the language itself.
  • Apache Avro, as our serialization system.
  • Java Microbenchmark Harness (JMH), as means to collect performance metrics, namely deserializations/sec.
  • Async-profiler, to get insights on what takes time and what our next step should be.

Benchmarks are run in 4 threads, we will need that later. Benchmark code is available at GitHub repository.

The baseline

We need to start somewhere, so we will start with the most straightforward solution: the Avro library. To check performance of the library, however, we need to setup JMH with data to be deserialized, benchmarks to be measured and glue to connect everything.

JMH comes with a notion of State and Benchmark classes, which provides fine-grained control over state lifecycle and dependencies. To begin with, we need to have an Avro payload to deserialize, so we will use this example throughout our entire optimization process:

We use one single measurement to have as little deviation in deserializations count as possible, since we want to compare implementations only.

We also need a state for the deserializer:

and the benchmark itself (which must be written in Java due to JMH limitations):

Running the benchmark we get the following result:

Pretty good! The library can deserialize around 1M measurements every second per core (given that our benchmark is set to run on 4 threads). The question is…

Can we do better?

Let’s have a peek into what is happening under the hood, for which we simply call the deserialize method in a loop and run async-profiler to get a flamegraph:

Flamegraph of library Avro deserialization

One thing instantly catches an eye, there are plenty of Generic* class calls. While not giving us much insight on what they are and reasons why they are called, they do give us a hint — it seems that the library might have some overhead because it is built in a generic way to handle all possible cases the Avro format provides (like schema evolution, etc.). The good thing about our use case is that we have full control over the data model, and we know exactly what features we need. Turns out the Avro binary protocol is not complicated and primitives deserializer is easy to implement:

The code is simplified for purposes of this article and doesn’t do all data consistency checks

Gluing everything together, adding a bit of polish and voila, the deserializer:

Are there any improvements over the base version?

While it does not increase performance enough to justify an in-house solution, it allows us to apply more specific and aggressive optimizations (as we are not bound to library data types anymore).

No more strings

Let’s take a look at another flamegraph to see what our current bottleneck is:

Flamegraph of manual Avro deserialize

That’s… curious. Why does string initialization take so much time, and can we optimize it?

Let’s count created strings per measurement:

  • Metric name — one per measurement.
  • Tag keys — varies on data, and in the benchmark it’s set to 5.
  • Tag values — same amount as tag keys, so 5 more.

This totals 11 strings parsed per measurement! Can we get rid of it?

What we can do is to use the nature of our data: it’s time series, and that means there are many measurements with different logtime and value but the same metric name and tags. What if we keep already deserialized strings, and map raw UTF-8 bytes to them? Then we can extract a raw string, do a lookup if we have deserialized it already, and reuse existing String object if so.

However, there is a catch: as we are working with time-series, we need to group measurements together by metric and tags and process data down the line. Therefore, interning strings (i.e. keeping Map[String, String] as tags and String as metric) will harm deserialization performance because:

  • We need to do comparisons for metric and tags, and we need to be extra careful and compare strings by reference, not by equals() (as reference equality is much faster than comparing string bytes).
  • It’s not optimized for serialization (e.g. long-term storage or disaster recovery), as same string will be serialized multiple times.

What we can do is to map string bytes into numerical IDs and numerical IDs back to String objects. It will allow us to use optimized java.util.Arrays methods for comparison / calculating hash codes and it provides convenient representation for serialization.

Important note — we can’t use any bidirectional map implementation. The reason is that we need to:

  1. Map string UTF-8 bytes to numeric id (Array[Byte] → Int) on deserialization stage.
  2. Map numeric id to string object (Int → String) to do filtering on query stage (which includes tag patterns like server → sgp*, which means that all tag values of server tag starting with sgp must be matched).

Neither BiMap[Array[Byte], Int] nor BiMap[String, Int] suffice both use cases.

Our interface looks like this:

As we have a full control over method signatures now, there is no need to chip subarray for each string from the Avro message to do interning. While JVMs are extremely efficient at allocating and cleaning short-lived objects, it is still an unwanted performance overhead of touching memory/GC. The first implementation won’t use this opportunity for the sake of simplicity, but we will take the full advantage of it later.

What would be a simple yet efficient implementation? Int to String mapping could be done by using an array list:

And the easiest way to do direct mapping is Map[Array[Byte], Int], where keys are the bytes of UTF-8 strings. There is a catch however: equals() and hashCode() are not defined for Array, so a Utf8 wrapper is introduced:

Having this interner in place, we can replace the deserializer’s readString() to intern bytes instead of creating String objects:

Let’s see how this implementation performs:

Now this is something to consider, almost 2.5x speedup! While this is impressive, dropping the standard library in favor of in-house solutions should not be the default choice. Yes, it’s fast, but it comes at a cost of limited features (we need to write a deserializer for each schema, we don’t support schema evolutions, etc.) and we need to support the code ourselves. It is justified in our case though, as we spend ≈50% of time deserializing in our pipeline, with all the optimizations.

Refining the data model

Another peek at a flame graph, now using interning:

Flamegraph of interning Avro deserialize

Most of the time is spent in calculating hash code. We cannot avoid it, as we need it in order to do hash map lookup, but these calls are significantly cheaper than parsing UTF-8 on every String creation.

What stands out in the flamegraph is that 21% time is spent in HashMap.put. The reason for that is HashMap modeled tags, and the question is, can it be done better? Well, it depends on the use case. It is possible to replace the map with an array of tag key/value pairs, but we need to keep in mind that:

  • The model is sufficient for our needs down the pipeline (otherwise we might get 20% performance boost in deserialize and then spend 3x the time in later processing).
  • Tags must be sorted, otherwise the same tags with different ordering will lead to different arrays, meaning they will be treated as different series.

Luckily, this model fits our use case, so we create a new measurement model class and change the readTags() method:

Regarding the sorter, insertion sort is picked for this example. Why an O(n²) algorithm? The reason is simple: our tags map is small (in fact it is limited to 128 for our system), which means that we care about real run time on the input rather than asymptomatic complexity. Sorting algorithms of this class (like Shellsort) outperform O(n*log(n)) versions on small inputs (and often back library sort functions).

Let’s measure the performance of this version:

Indeed, removing HashMap overhead improves the performance.

Embracing concurrency

There is one more thing before we reach our performance nirvana. String interners are local per thread and ids are generated independently, which results in different IDs for the same strings across threads. Why it is an issue? Let’s look at a brief schematic of the application architecture:

Several threads within JVM write to the same database

We process measurements in separate threads to take advantage of multi-core processors, but at one point we need to transfer data to long term storage. With our approach thus far, same strings from different threads will have different IDs, which requires either expensive synchronization of string IDs at the write stage, or a storage query engine which can address inconsistency. Can we share a string interner across threads?

The simplest solution would be to synchronize it with synchronized {} blocks, but Amdahl’s Law hits us hard:

Here S(latency) is theoretical speedup we can achieve, p is the portion of synchronized time, and s is speedup of the parallel portion of computation. With more than 50% of time in interning we cannot speed up the whole pipeline by more than 1/(0.5+ϵ)≈2 times. That’s sad given our workload and the fact that the whole model allows us to process data independently.

What we want is a form of lock-free data structure, so that we have as little contention as possible. Can we use existing libraries for that? We could find a performant bidirectional map implementation for our task, but we get the same issue of Array[Byte]IntString mapping. Even worse, anything lock-based will jeopardize performance by imposing overhead on every access. We are talking tens if not hundreds of millions of intern calls per second given that every measurement triggers several string interns.

What we can do is to use specifics of our data:

  • “Get existing string id” case heavily dominates “insert new string” case (as series don’t change that often).
  • String map size can be fixed because:
    a) Series are mostly constant over time, meaning mostly constant amount of strings.
    b) Mapping itself does not take this much space, so we can allocate space for far more strings than we need for just a few megabytes.
  • There are no deletes, updates, or resizes — the only operations are “insert if empty or return” and “get”.
  • Requirements can be loosened so that interning is successful in almost all cases (it is acceptable to miss a measurement or two per hour).

Given that, we implemented a concurrent string interner by using an open addressing hash map with trimmed functionality and added concurrency support. The trick is that a value of hash map key is never changed once set, so we need to synchronize only “insert if empty” and “get” operations, which is done via two-stage value set with compare-and-swap (CAS). There is also a protection in case we have a dodgy race condition or data corruption somewhere — a hard limit over cycle count in “get” loop. It’s better to lose measurements than to be stuck in deserializing forever, and it doesn’t happen very often (in fact we have yet to see this happen in our production environment).

This is how interning looks (full implementation is available in repository):

Let’s check how well it performs with 4 concurrent threads doing deserialization with one shared string interner:

It’s not only thread safe, it’s even more performant due to more memory and cache-friendly usage patterns (which is a whole other topic for another time).

Conclusion

Final result — almost 4x speedup of Avro library while having cross-thread data synchronization! How did we do it?

Measure -> Hypothesize -> Implement -> Measure

At every step we measure performance and collect performance profiles to understand what our current bottleneck is, make a hypothesis of why it happens and what the solution is, implement the optimization and measure again. It’s so tempting to just optimize something, yet it rarely yields any positive results.

The data model

The data model is driven by intrinsic properties of our use case — timeseries strings don’t change over time (so we intern and have no resizes for concurrent intern map), we don’t need map of tag keys to tag values, the sorting algorithm is perfect for small tag count, etc. The data model was optimized to remove all unnecessary operations as much as possible while covering all our use cases.

In the end, performance optimization is a never ending journey, and no performance peak is final. It is important to understand the requirements and when it’s time to focus on other parts of the application.

If you want to play with the code feel free to check out GitHub repository.

Acknowledgement

Kudos to Ches Martin ♞ / Akshesh Doshi for text review and Dennis Hunziker for code review.

--

--