How we built a highly scalable distributed state machine
Say hello to “big workflow” — part 2
In Zeebe.io — a highly scalable distributed workflow engine I described that Zebee is a super performant, highly scalable and resilient event-sourced workflow engine. I described that this allows to leverage workflow automation in a lot more use cases, also in low latency, high-throughput scenarios. I revealed that Zeebe plays in the same league as e.g. Apache Kafka. I teased the key ingredients: a truly distributed system without any central component; designed according to top-notch distributed computing concepts and in line with the reactive manifesto; applying techniques from high performance computing.
In this article I want to go deeper. I will go over important concepts from distributed computing and explain decisions we made on the way. This should give you a proper idea how we enter a new area of scalability for workflow automation, which my co-founder names “big workflow”.
But I want to give kudos first. Especially to Daniel Meyer, our VP of engineering. He came up with most of the ideas. And the Zeebe team made it reality. So my contribution today is solely to steal a lot of content from internal and external presentations or papers and make an article out of it.
Zeebe leverages event sourcing. That means that all changes to the state are captured as events and stored as records in a so called event log.
Events are immutable and therefor this event log is append-only. Nothing will ever be changed once it is written, it is like a journal in accounting. Append-only logs are relatively easy to handle, because:
- You do not have updates, so you cannot have conflicting updates (VALIDATE).
- There are known strategies to replicate append-only logs.
- It is very efficient as it sequential writes are much more efficient than random ones. So if you write big numbers of events by appending by chunks of bytes serialized to your disk your hard disk can perform best.
The current state of a workflow can be derived from these events. This is known as projection. A projection in Zeebe is saved internally as snapshot for performance reasons (basically to allow for a quicker startup time) but it can always be reconstructed from the event-log.
As the log grows over time we currently implement log compaction. This can for example remove events for all ended workflow instances. Especially if you have transferred all audit data to some separate read model (as explained in part one of this article) this is a safe thing to do.
Zeebe writes the log to disk. Currently this is the only supported option. But as writing an append-only log does not need any guarantees from the storage, this could be easily pluggable in future and other storage options might follow. Cassandra is discussed regularly for example. But so far we focus on file system and it might even be the best choice for most use cases, as it is the fastest option and we will further optimize on this default option.
The single writer principle
When you have multiple clients accessing one workflow instance at the same time you have to have some kind of conflict detection and resolution. When you use a RDMS this is often implemented via optimistic locking or some database magic. With Zeebe we solve this by using the Single Writer Principle. As Martin Thompson wrote:
Contended access to mutable state requires mutual exclusion or conditional update protection. Either of these protection mechanisms cause queues to form as contended updates are applied. To avoid this contention and associated queueing effects all state should be owned by a single writer for mutation purposes, thus following the Single Writer Principle.
So independent of the number of threads on our machine or the overall size of the Zeebe cluster there is always exactly one thread that writes to a certain log. This is great: the ordering is clear, no locking is needed and no deadlock can occur. You do not waste time to manage contention but you can do real work all the time.
In order to understand this a bit better let’s have a look at what happens if a client wants to complete a task within a workflow:
SOME API USAGE CODE: `client.createCompleteTaskCommand().send() …`
The execution works like this:
Note that a client does not have to block until a response arrives.
TODO: Hint on why responding before commiting event is ok. MONITOR https://github.com/zeebe-io/zeebe/issues/795#issuecomment-384225955
There is also a non-blocking API available:
SOME API USAGE CODE
If you wonder if that means that Zeebe only leverage one thread to do the workflow logic, you are right so far! I will talk about scaling Zeebe later on. But first lets have a look at fault-tolerance and resilience. This is achieved by replication in a Zeebe cluster.
To provide fault-tolerance you run multiple Zeebe brokers which form a peer-to-peer cluster. We designed this in a way that it does not need any central component or coordinator, hence there is no single point of failure.
To form a cluster you need to configure at least one other broker as known contact point in your broker. During the startup of a broker it talks to this other broker and fetches the current cluster topology. Afterwords the Gossip protocol is used to keep the cluster view up-to-date and in-sync. Brokers also save the latest known cluster topology locally to remember them after a restart.
Replication using the Raft Consensus Algorithm
Now multiple servers have agree on the event log. Distributed consensus describes how multiple nodes in a cluster can agree on a value. So Zeebe uses the Raft Consensus Algorithm to replicate the event-log (and therefor also the current state) between brokers. There is an awesome visual explanation of the Raft Consensus Algorithm available online, so I will not go into all the details here.
The basic idea is that there is a single leader and a set of followers. When the Zeeber brokers start up they will elect a leader on their own as described by Raft. As the cluster constantly gossips, the brokers recognize if a leader has gone down or disconnected and try to elect a new leader.
Only the leader is allowed write access to the data. The data written by the leader is replicated to all followers. This is done synchronously. Only after a successful replication the events (or commands) are processed within the Zeebe broker. If you are familiar with the CAP theorem that means that we decided for consistency and not for availability, so Zeebe is a CP system. (I apologize to Martin Kleppmann who wrote Please stop calling databases CP or AP, but I think it helps to understand the main goals of Zeebe).
We tolerate partitioning of the network as we actually have to tolerate partitioning in every distributed system, as you have no influence on this, see http://blog.cloudera.com/blog/2010/04/cap-confusion-problems-with-partition-tolerance/ and https://aphyr.com/posts/293-jepsen-kafka. We decided for consistency instead of availability, as consistency is one of the promises for use cases of workflow automation.
An important configuration option (not yet implemented by the way, but following soon) is the replication group size. In order to elect a leader or to successfully replicate data you need a so called quorum, which means a certain number of acknowledgements of other Raft members. Because we want to guarantee consistency, Zeebe requires a quorum ≥(replication group size / 2) + 1. Let’s make a simple example:
- Zeebe nodes: 5
- Replication group size: 5
- Quorum: 3
So we can still work if there are three nodes reachable. In case of a partition like sketched above only one network segment can reach quorum and continue to work — the other two nodes will not be able to do anything. So if you are a client in the network segment with these two nodes, you cannot work anymore, thus availability is not guaranteed. A CP system.
This also avoids the so called split-brain phenomena, as you cannot end up with two network segments doing conflicting work in parallel. A good in-depth discussion can be found in the forum.
When log entries are written by the leader they are first replicated to the followers before they will be executed. Replication is currently always done synchronously.
That means, every log entry that gets processed is guaranteed to be correctly replicated. And the replication guarantees that no committed log entry is ever lost. Larger replication group sizes allow a higher failure tolerance but increases the latency as a broker has to talk to more replication nodes.
Replication is also the strategy to overcome challenges around writing to disk in virtualized and containerized environments. You have no control when data is really physically written on disk in these environments. Even if you call fsync and it tells you that the data is safe, it might not be. But we prefer to have the data in the memory of a couple of servers than on the disk of one of them.
While replication adds latency to the processing of a command within Zeebe it does not affect throughput much (VALIDATE!!). The stream processors within Zeebe are not blocked by waiting for the answer of a follower. So Zeebe can continue processing at a fast pace — but the client waiting for his response might need to wait a bit longer.
The client view
If you want Zeebe to do something you use the client library to send a command to the broker, e.g. to start a new workflow instance or to complete a job. This command has to be sent to the leader. Therefor the client
- has to know the Zeebe cluster topology including leader/follower information and every client
- has to be able to talk to every broker.
A client can initially connect to any Zeebe broker and gets back the current view on the cluster including leader and follower information. From then on the client keeps polling the current topology from the broker regularly. This allows that the client to talk to the leader. In case the client contacts the wrong broker (because of an outdated cluster view) it will respond with an error and the client refreshes his cluster view first before trying again. (VALIDATE)
While this might sound complicated, it is hidden from Zeebe users in a client library. So in Java it woulds simply look like this:
CODE (add as soon as stabilized)
There is only one interesting open issue in this area is about so called advertised endpoints. Currently Zeebe identifies broker node by its socket address (host:port). These addresses are also handed over to the client. But in containerized environments like Kubernetes the client needs to have external IPs or names (see open issue https://github.com/zeebe-io/zeebe/issues/298).
Scale out by partitioning
So far we talked about having exactly one thread processing all work. If you want to leverage more than one thread you have to create so called partitions. Every partition represent a separate physical append-only log.
Every partition has its own single writer, which means you can use partitions to scale. The partitions can be assigned to
- different threads on a single machine or
- different broker nodes.
Every partition forms an own Raft group, hence every partition has its own leader. If you run a Zeebe cluster, one node can be leader for one partition and follower of others. This might be a very efficient way to run your cluster.
All events related to one workflow instance must go onto the same partition, otherwise we would violate the single writer principle and also make it impossible to recreate the current state in a broker node locally.
One big challenge is how to decide which workflow instance goes onto which partition. Currently this is a simple round robin mechanism. So when you start a workflow instance the client will decide by round-robin in which partition to put it (VALIDATE!!). More sophisticated or also pluggable strategies will follow later.
Users often ask for multi data-center replication. Currently there is no special support (yet). But a Zeebe cluster can technically span multiple data-centers though, but you have to prepare for the added up latency. If you set up your cluster in a way that quorum can only be reached by nodes from both data centers you already have taken care of epic disasters, but costing you latency.
Topics — how to organize your stuff
In Zeebe you have to organize everything into topics. The topic configures the number of partitions and some other stuff. In contrast to partitions the client decides himself which workflow goes into which topic. If you do not specify a topic, a default topic named “default-topic” is used (or shall be used soon, see https://forum.zeebe.io/t/why-not-automatically-create-default-topic/122/10).
Using different topics might be useful for several reasons:
- Quality of service: You might have workflows that are more important or more time critical than others. Putting them on an own topic makes sure they get dedicated resources and don’t have to share them too much with all the other stuff.
- Data separation: Data for different topics never get written into the same log file. Read access to the log only shows you the data of one topic. Workflows can never interfere. Splitting the data even onto different clusters later on is still possible. All of this is helpful to implement e.g. tenants or isolate between different departments, service boundaries or the like.
Compare topics to database schemas.
The following picture shows all concepts as overview. Note that this says nothing about the physical cluster layout. So the whole picture might fit on 3 Zeebe nodes (as the replication group size in the picture is 3).
Why not leveraging Kafka or Zookeeper?
A lot of people ask why we write all of the above ourselves and do not simply leverage a cluster manager like Apache Zookeeper (or even a fully fledged event bus like Apache Kafka). Here are the main reasons for this decisions:
- Ease of use and ease of getting started. We want to avoid third-party dependencies that need to be installed and operated before Zeebe can be used. And especially Apache Zookeeper or Apache Kafka are not easy to operate. We strive for a very simple get started experience (run a docker image or unzip the distro and run one script) without limiting the adoption of Zeebe to trivial use cases (a constant struggle by the way, as a proper high-volume low-latency cluster needs to be setup a bit different than a developers box on the first contact).
- Efficiency. Having the cluster management in the core broker allows us to optimize it for our concrete use case, which is workflow automation. A couple of features would be less performant if build around an existing generic cluster manager (e.g. correlating messages to the right node running the respective workflow instance).
- Support and control. With our long experience being an open source vendor we learned that it is really hard to support third-party dependencies at this core level. Of course we could start hiring core Zookeeper contributors, but it will sill be hard as there are multiple parties at the table, so the direction of these projects is not under our own control. With Zeebe we invest in having control over the full stack, allowing us to drive full speed into the direction we envision.
CHECK IF HAVE MORE ARGUMENTS
Design for performance
Apart from scalability, Zeebe is also built for high performance on a single node from ground up. Every line of code is checked to comply with certain rules, most importantly to be garbage free in the data path. Zeebe is written in Java. Java has so called garbage collection which cannot be turned off. The garbage collector regularly kicks-in and checks for objects that he can remove from memory. During garbage collection your system is paused — and the duration depends on the amount of objects checked or removed. This pause can add noticeable latency to your processing, especially if you process millions of messages per second. Now Zeebe is programmed in a way to reduce garbage. Code that is executed for every message (and that means: very often if you have a high load) is not allowed to create garbage. Therefor own data structures are used that can be reused and managed from Zeebe itself.
Another strategy is to use ring buffers and take advantage of batching of statements wherever possible. This also allows to use multiple threads without violating the single writer principle described above. So whenever you send an event to Zeebe, the receiver will add the data to a buffer. From there another thread will actually take over an process the data. Another buffer is used for bytes that need to be written to disk.
This approach allows to batch operations. Zeebe can write a pile of events to disk at one go; or send a couple of events in one network roundtrip to a follower. This reduces overall latencies. (VALIDATE)
Remote communication is also done very efficiently using a binary protocol. Zeebe leverages Simple Binary Encoding (SBE) and Message Pack. In a nutshell every communication is a message that consists of
- a custom frame containing
- the record as SBE message containing
- payload encoded via Message Pack.
If you are interested in more details best dive into the code. You find the framing in zb-dispatcher (DataFrameDescriptor.java), transport headers in zb-transport (TransportHeaderDescriptor.java), SBE in zb-protocol and Message Pack in zb-msgpack-json-path.
With having all these basics you are free to dive into the code if you like. Note that this is not at all necessary to get started!
The main components are:
- zb-util: Building blocks that are used in most other modules, e.g. actor scheduler, buffer allocations, hash map, metrics.
- zb-protocol: SBE definition of the message protocol.
- zb-bpmn-model: Java API for BPMN, e.g. used for parsing.
- zb-msgpack-json-path: Message Pack implementation with extensions to evaluate JsonPath expressions on Message Pack objects.
- zb-dispatcher: Implementation of message passing between threads.
- zb-service-container: Implementation to manage dependencies between different services.
- zb-logstreams: Implementation of the append only log stored on file system.
- zb-transport: Abstraction over network transports.
- zb-gossip: Implementation of gossip protocol for cluster membership and failure detection.
- zb-raft: Implementation of Raft consensus protocol for replication.
- zeebe: Contains the Zeebe broker itself as well as the Java client.
- zbc-go: The GoLang client.
- zbctl: The command line interface.
- zeebe-simple-monitor: A hacky Spring Boot application that provides a very simple monitoring interface to see what is going on in your broker.
Zeebe is a completely new class of workflow/orchestration engine in the emerging market of big workflow. What sets Zeebe apart from all other orchestration/workflow engines is its performance and the fact that it is designed as a truly scalable system without any central component or the need for a database.
Zeebe does not follow the traditional idea of the transactional workflow engine where state is stored in a shared database and updated as it moves from one step in the workflow to the next. Instead, Zeebe works as an event sourced system on top of replicated, append-only logs. So Zeebe has a lot in common with systems like Apache Kafka. Zeebe is fully reactive and uses a efficient pub/sub protocol.
Contrary to other microservice orchestration engines on the market, Zeebe puts a strong focus on visual workflows as we believe that visual workflows are key for providing visibility into asynchronous interactions, at design time, runtime and during operations.
With this article I gave an introduction into Zeebe not only from the user perspective, but also dived deeper into relevant concepts.