Understanding the LMAX Disruptor

Credits: pixabay

LMAX Disruptor is an open-source Java library written by LMAX Exchange, a financial trading platform company. It is an elegant and, especially, performant solution for inter-thread messaging.

In this post, we will describe first the problems related to sharing memory across threads and the traditional queuing systems. Then, we will try to understand what is so special about the LMAX Disruptor and how to use it.

  • The LMAX Disruptor solution is faster than Java ArrayBlockingQueue and LinkedBlockingQueue.
  • A mechanical sympathy (good understanding of the underlying hardware) should make you a better developer.
  • Sharing memory between threads is prone to problems, you need to do it carefully.
  • CPU caches are faster than main memory but a bad understanding of how they work (cache lines etc.) can ruin your performance.

The Illusion of Shared Memory

Let’s start with a simple use case. We need to increment a counter from 0 to MAX using a loop:

Because MAX can be big and that we can run this program on a multi-core processor, for performance concerns, we might want to split this job into two separated threads. Something like this:

There are two main problems with this implementation though.

First, let’s run it multiple times with MAX equals to 1 million. Obviously, we expect to print counter=1000000, isn’t it?

counter=522388
counter=733903
counter=532331

The execution is not deterministic due to a race condition on the sharedCounter variable.

A race condition arises when a result depends on the sequence or timing of processes/threads. In this case, because sharedCounter is modified simultaneously by the two threads without any protection.


Secondly, regarding performance. If MAX is set to a value big enough to make the management of the two threads negligible, the execution is still more than 3 times slower. What is the reason?

As both threads will be CPU-intensive, the OS will most likely schedule them on two different CPU cores. Moreover, we might believe that both threads running on two different cores can share memory freely. Yet, we have forgotten the concept of CPU caches:

Memory layers

Instead of having to query the DRAM every single time to access a memory address, a CPU will cache data in its internal caches: a local L1 and L2 caches and a remote and shared L3 cache.

Let’s check at some figures for a Core i7 Xeon 5500:

|    Memory    |        Latency        |
|--------------|-----------------------|
| L1 CACHE hit | ~4 cycles, ~1.2 ns |
| L2 CACHE hit | ~10 cycles, ~3.0 ns |
| L3 CACHE hit | ~40 cycles, ~12.0 ns |
| DRAM | ~60.0 ns |

When a processor wants to access a memory address, it will first check in L1. If it was not there, it will produce a cache miss which means the processor will check then in L2. Same for L2 and L3 and the processor can end up checking in the DRAM which is about 60 times slower than L1.

In our example, because sharedCounter is updated by two CPU cores at the same time, it will bounce the variable between the two local L1 caches which will slow down drastically the execution.


Lastly, there is another concept which is important to understand about CPU caches: they are organized into cache lines.

A cache line is a power of 2 of contiguous bytes, typically 64 bytes. It is managed as an unchained hash map with every address in memory being assigned to a given cache line.

Let’s take an example with two variables x and y accessed by two threads scheduled on two different cores. The first thread modifies x and the second modifies y few ns later.

If both variables are part of the same cache line (hashing their address gives the same cache line), the second core will see its whole line marked as invalid even though the program was only interested in y and not x. As a result, the second core will fetch a more recent copy of this line somewhere else (in L2, L3 or DRAM).

This problem is known as false sharing and will result in an important performance penalty (the CPU will guarantee that the execution will be deterministic though).

Concurrent Execution

Concurrent execution of code must first ensure mutual exclusion. This means that multiple threads accessing the same resource require coordination.

Moreover, when a change is made, it has to be marked as visible to other threads. This is called visibility of change.

These are the two main concepts of concurrency.

Mutual Exclusion

Locking is one possible solution to achieve mutual exclusion. Yet, they are very expensive as it requires arbitration when contented. This arbitration is achieved by a context switch to the OS which will suspend threads waiting on this lock until it is released.

Moreover, during this context switch (but also while releasing control to the OS which may decide to do other tasks), the execution context can lose previously cached data and instructions.

Let’s check at some figures to understand the impacts of locking with a 64-bit counter incremented in a loop running 500 million times:

|        Test case        | Execution time |
|-------------------------|----------------|
| Single thread | 300 ms |
| Single thread with lock | 10 000 ms |
| Two threads with lock | 224 000 ms |

An alternative to locks is CAS (Compare And Swap) operations. A CAS operation is a machine instruction that allows a word to be conditionally set as an atomic operation (all or nothing).

In a nutshell, the old and expected values of a variable are provided as parameters. At the end of the operation, if the new value matches the expected one, the variable is updated. Otherwise, it is up to the thread to retry another CAS operation.

In Java, Atomic* classes like AtomicInteger are based on this operation.

Let’s compare the performance results:

|       Test case        | Execution time |
|------------------------|----------------|
| Single thread with CAS | 5 700 ms |
| Two threads with CAS | 30 000 ms |

Last but not least, it’s worth mentioning that if the critical section (the protected part) does not fit in a single atomic operation, it will require the orchestration of multiple CAS operations and will be way more complex and expensive to handle.

Visibility of Change

Visibility of change can be achieved with a memory barrier (also called memory fence).

A memory barrier causes a CPU to enforce an ordering constraint on memory operations issued before and after the barrier instruction.
Source: Wikipedia

Why do we need such mechanisms? For performance reasons, most modern CPU employ performance optimizations which may result in out-of-order execution.

Let’s go back to our first example:

In this example, it does not matter when is the loop counter updated as no operation within the loop uses it. The CPU is free to reorder instructions for optimizing the execution performance.

This reordering is not a problem in case of a single threaded execution but can become unpredictable in the context of concurrent execution.

This is the goal of memory barriers:

  • Ensuring that all instructions either side of the barrier appear in the correct order if they are observed from another CPU core.
  • Making the memory visible by propagating data to the underlying caches.

There are different types of memory barriers:

  • Read: gives a consistent view of the world for write operations ordered before the barrier.
  • Write: gives an ordered view to the world of the store operations before the barrier.
  • Full: a composition of the read and the full barrier.

In Java, using a volatile field inserts a write barrier instruction after we write to it, and a read barrier instruction before we read from it. Meanwhile, a final field of a class is made visible using a write barrier once the constructor completes.

It is also possible to access such instructions from the Unsafe library.

Let’s modify our multi-threaded implementation by using a read memory barrier before to access sharedCounter and a write memory barrier after:

Let’s run again this implementation with MAX equals to 1 million. Bear in mind, we still expect to print counter=1000000 at the end of the execution:

-- First multi-threaded implementation without a memory barrier
counter=522388
counter=733903
counter=532331
-- New implementation with memory barriers
counter=999890
counter=999868
counter=999770

As we can see, there’s an undeniable impact here as we are getting closer to the expected result due to the memory barriers. Yet, the execution is not deterministic as a memory barrier is still not enough to prevent race conditions with non-atomic operations.

Traditional Queues

One alternative to sharing memory between threads is the message passing paradigm: sharing memory by communicating.

It means we need something in between threads to handle communications. One of the solutions is to use traditional queues like LinkedBlockingQueue or ArrayBlockingQueue in Java.

Yet, it does not solve concurrency problems as even a queue must ensure mutual exclusion and visibility of change.

If we take a look at the put method of ArrayBlockingQueue, we can verify that both aspects are handled:

The accesses to the queue are locked and once an element has been added a signal is sent to an awaiting thread.

Something very interesting, the LMAX team noticed that typically, queues tend to be always close to full or close to empty due to the difference in pace between consumers and producers. This observation results in a high-level of contention and/or expensive cache management.

If the queue is close to full, it will result in contention between the producers, leading to context switches and perhaps losing cached data and instructions.

Moreover, in a traditional queuing system, the producers claim the head of the queue while the consumers claim the tail. If the queue is close to empty, it is very likely that the head, the tail and the queue size will all belong to the same cache line which may lead to the false the sharing problem described hereabove.

Disruptor

The creators of the LMAX Disruptor are also famous for having invented the concept of Mechanical Sympathy. In a nutshell, understanding the underlying hardware should make you a developer when it comes to designing algorithms, data structures etc. Based on this philosophy, the team has been able to produce this great library:

The Disruptor has significantly less write contention, a lower concurrency overhead and is more cache friendly than comparable approaches, all of which results in greater throughput with less jitter at lower latency.
Source: Disruptor technical paper

Let’s try to analyze the reasons.

First, the Disruptor is based on a ring buffer structure (also called circular buffer) which is simply a single and fixed-size buffer as if it were connected end-to-end:

Ring buffer structure

On startup, the memory is allocated according to the provided size (which must be a power of 2) and a factory to initialize events:

Here, we allocated ringBufferSize instances of MyEvent class on startup.

Meanwhile, we provided a factory to create threads for event processors and a wait strategy defining how to handle slow subscribers (those strategies are described here). We’ll discuss the producer type a bit later.

The event instances will be reused and live for the duration of the Disruptor instance to eliminate issues with garbage collections as in traditional queues, events may survive longer than they should.

Internally, the ring buffer is backed by an array of objects. There is a good reason for that according to the creators. At the hardware level, an array has a predictable pattern of access. The entries can be pre-loaded, so the processor is not constantly going back to the main memory to load the next item in the ring. This way, we can iterate way more efficiently than with a queue backed by a linked list for example.

Configuring a consumer can be done like this:

We provided a lambda expression to handleEventsWith(). This lambda has three inputs:

  • The event itself
  • A sequence identifier
  • A boolean for batch management

In the case of multiple producers, they are all going to receive every single event. If we want to distribute the load, we can implement a sharding strategy based on the sequence identifier. Something like this:

Then we can start the Disruptor which returns a RingBuffer instance:

Once the Disruptor instance is started, we can publish an event the following way:

The sequence identifier is the position of the event in the ring buffer structure.

When we created a Disruptor instance, we also passed a producerType variable which can be equals either to ProducerType.SINGLE or ProducerType.MULTI. This indicates to the Disruptor whether we will have single or multiple producers.

In the case of a single producer, the ringBuffer.next() is completely lock-free. On the other side, if we have multiple producers, this function relies on a CAS operation to provide the next sequence identifier available in the ring buffer.

The sequence identifier itself is managed by adding left and right padding to make sure it is never in a cache line with anything else:

Furthermore, publishing an event creates a memory barrier to make sure the caches are up to date with this event. It allows adding an event in the ring buffer structure, without any locking which gives us a huge performance improvement.

The last point to mention. We said that the ring buffer was backed by a simple array. That means, it is up to the developer to prevent potential false sharing problems with multiple events that would belong to the same cache line.


Let’s conclude this post by sharing performance results produced by the LMAX team when they compared the Disruptor with ArrayBlockingQueue.

Throughput performance testing in ops/sec (P stands for provider and C for consumer):

                     | ArrayBlockingQueue | Disruptor  |
|--------------------|--------------------|------------|
| Unicast: 1P – 1C | 5 339 256 | 25 998 336 |
| Pipeline: 1P – 3C | 2 128 918 | 16 806 157 |
| Sequencer: 3P – 1C | 5 539 531 | 13 403 268 |
| Multicast: 1P – 3C | 1 077 384 | 9 377 871 |
| Diamond: 1P – 3C | 2 113 941 | 16 143 613 |

Latency performance testing in ns/sec:

                     | ArrayBlockingQueue | Disruptor |
|--------------------|--------------------|-----------|
| Min Latency | 145 | 29 |
| Mean Latency | 32 757 | 52 |
| 99th percentile | 2 097 152 | 128 |
| 99.99th percentile | 4 194 304 | 8 192 |
| Max Latency | 5 069 086 | 175 567 |

Any suggestions or improvements related to this post are more than welcome!

Further Reading