Streaming Engine: Execution Model for Highly-Scalable, Low-Latency Data Processing

Slava Chernyak
Google Cloud - Community
18 min readFeb 15, 2022

Introduction: Why Did I Write This?

Google Cloud Dataflow’s Streaming Engine is being used to run data analytics, ETL, ML, and other types of pipelines, many of which are processing tens of millions of messages and tens of gigabytes per second. In these pipelines data is being processed with exactly-once guarantees and with latencies consistently in the low single-digit seconds. I wanted to write down the kinds of design choices that allowed us to build and scale Streaming Engine, and in doing so to answer the types of questions I encounter frequently.

This post is meant as a deep-dive companion to the discussions in Streaming 101 and 102 as well as Streaming Systems. While those explore the principles of stream processing in some generality, this post delves into some specific details of the execution model that makes fast, scalable, correct streaming possible with Streaming Engine.

I hear you say: “If I am writing my pipeline in Apache Beam, why do the implementation details of Streaming Engine matter?” Besides general curiosity, if you run your Beam pipelines on Streaming Engine, the Law of Leaky Abstractions implies that regardless of the care and effort put into the programing model, there is no perfect abstraction that will hide all of the details from you forever. Thus, understanding the execution model of Streaming Engine will go a long way to filling the gaps between the abstraction and the “ground truth” of how a pipeline is run by Streaming Engine. Armed with this knowledge, you will be better able to understand the tradeoffs in executing your large distributed pipeline, and therefore choose a more scalable and efficient design.

Let’s dive in…

From Pipeline Code to Execution Graph

To ground this discussion in some reality, and to make it easier to follow, let’s start with a concrete example pipeline. I don’t want to describe the architecture in a vacuum, so I will pare it with the example pipeline. For our example, let’s write a simple pipeline that reads messages from PubSub, and for each #hashtag encountered in the message body, produces a time-specific analysis of messages containing that tag. We accomplish this as follows:

When a pipeline is submitted for execution to the Dataflow Service, it is first transformed into an execution plan called the “Optimized” pipeline graph. Optimizations such as operator fusion, combiner lifting, flatten unzipping, and more are applied to the original pipeline to produce a new one with equivalent semantics. More information on fusion is available here. The output is a Directed Acyclic Graph (DAG) with nodes, called stages, and edges, representing data shuffles connecting the stages. Each stage performs roughly the following sequence of operations:

  • Read from shuffle input or a data source: This is how the data gets into the stage, either from a previous (upstream) stage, or from some external source of data such as Cloud Pub/Sub, Kafka, GCS, etc.
  • Apply a sequence of user-defined transforms/functions (UDFs): This is the “Business Logic” of the dataflow pipeline at this stage. This may include transformations, aggregations, windowing, etc. In many cases this will also include logic that connects to the data sink, such as BigQuery, Cloud BigTable, etc.
  • Commit the resulting changes to durable state and output shuffle: Sends outputs that are to be shuffled downstream to further stages for processing. This part of the sequence also ensures durability and correctness, as we will examine in detail below.

The edges in this graph represent the shuffle streams connecting the output of one such node to the input of another. For our example pipeline, dataflow will arrive at the following graph:

Our optimized graph will only contain two stages corresponding to the fusion of operations that do not require data shuffling. The fusion is interrupted by the shuffle implied by the GroupByKey operation in the middle of our pipeline. Note that total (complete) aggregation can only occur after all messages for the same tag are shuffled onto the same key, and the window has closed.

Tip: You can inspect the resulting fused stages and the optimized graph for a dataflow job with the following command:

The documentation on monitoring fusion here provides more details. For example for the above pipeline the output will look something like this:

Scalability: Keys and Parallelism

A single Streaming Engine pipeline can process tens of millions of messages per second in parallel. To achieve this while maintaining exactly-once processing, Streaming Engine employs a number of techniques including batching of messages, parallelism of execution, and careful pipelining and sequencing of persisting to durable state without fine-grained barriers. Each message to be processed in Streaming Engine is associated with a specific “key” — all processing of messages for a given fused stage is performed in the context of that key.

What is a key? A key is an identifier that allows us to tie together state across time for related messages. “Related” in this context refers to messages that users want to group (as in GroupByKey) for aggregations. This key is exactly the same as the key in a Key-Value Store. In fact, the state associated with each key is stored in just such a Key-Value store, BigTable.

In the example pipeline above we shuffle by tag into Stage 2 — this allows us to accumulate a count for each tag. Specifically — applying the UDFs in Fused Stage 2 will be performed in the context of these keys. This allows for correct and efficient management of the per-key state that is necessary to compute the deferred count operation, implemented as windowing and triggering.

In many cases, just as in Stage 2 of our example pipeline, keys are explicitly the result of processing the input. However in other cases the keys are implicitly chosen by the implementation, when there is no semantic significance to the key. For example in Fused Stage 1 when reading from PubSub, there is no semantic significance to the value of the key. In fact, the key is not even present as a concept at the user level. However since there is still implicit system state to be tracked (such as state related to exactly-once processing, and downstream shuffles — more on this below), keys are assigned and managed by the implementation.

Message processing is serial per key. Specifically, once the UDFs for a given fused stage are being applied to a batch of messages for a given key, other incoming messages for that key will be buffered until the processing for the current batch completes. The main reason for doing so is a design choice that enables high throughput per key. Since the result of applying UDFs results in modifications to persistent state, serializing per-key processing allows for efficient caching of state as well as blind writes vs full transactions or fine-grained barriers. (Why must the application of any transforms result in persistent state mutations? Even if there is no explicit user-specified state, exactly-once processing is implemented through per-key state tracking, so at a minimum, processing of messages results in modifications to the stored exactly-once state — more on this in the sections below.)

This approach results in a high degree of scalability, with the important caveat that sufficient parallelism is available. In other words, since processing is serialized onto the available keys, an insufficient number of keys will result in a bottleneck in processing. Stated yet another way, the number of available keys is an upper-bound on the number of possible parallel threads. “Hot Keys” are another manifestation of this issue. While a keyspace may be large, the presence of “Hot Keys” means that only a few keys see the lion’s share of traffic effectively making these keys the bottleneck. Thus, processing parallelism is an important factor when assessing a pipeline’s design — as insufficient parallelism in any stage can lead to poor performance. In a sense, we have re-established a consequence of Amdahl’s Law for streaming — the possible speed-up is bound by how much of the execution can be made parallel.

Tip: you can see the per-fused-stage parallelism of your job by examining the metric ‘dataflow.googleapis.com/job/processing_parallelism_keys’. More information about the metric is available here and here.

Partitioning and Distributed Execution

The semantically-meaningful keys in fused stage 2 will be the tags encountered in processing the input messages. If the source dictionary is all of english, then there will be on the order of up to hundreds of thousands of distinct keys at any one point in time. These distinct keys for each stage of our processing form the basis to perform the partitioning and distribution of work so that we can scale our system.

The first important detail is that the logical keys are first hashed, and the hashes are prepended to the keys on all operations in Streaming Engine. All per-key operations in the back-end use the key as an opaque identifier, and having each key be prepended with the hash solves several problems. First, it allows us to map an unbounded keyspace to a keyspace where all prefixes are part of a bounded range. As we use 64-bit hashing, the possible set of all hashes exist in an interval [0, INT64_MAX) bounding our prefixes. This lets us reason about the “entire” keyspace when ordered lexicographically, and operate on it meaningfully as we will see below.

Another important property is that spatial locality of the logical keyspace is broken by hash-prefixing. This means that any patterns in load distribution over keys that exhibit locality, are “smeared” in the hash space. An example of where this is important: consider that your messages are search queries, and there will be some very common terms with almost-as-common misspellings. These will be lexographically proximate, but will be distant in the hash space. This will become important as we talk about how we achieve partitioning the work across many workers.

Let’s return to our example pipeline. There are two distinct keyspaces associated with it, one for each fused stage. Without making any assumptions about the kinds of messages that are flowing through and the kinds of keys that will be encountered, we know the full range of values of each keyspace in the hashed domain.

Streaming Engine then partitions each of these keyspaces into a distinct set of key ranges that together fully cover each keyspace. The partitions of each keyspace for each fused stage are independent, and may be different. For example the keyspaces may be partitioned as follows, where each range will contain any key that lexographically falls within it. For instance if our key “Banana” has a hash 5E58716B24, then the full key <5E58716B24,”Banana”> will be contained by the keyspace for stage 2 range [5678, DEF0) because the hash value falls between the start and end prefix of the range.

These ranges are then assigned to Streaming Engine workers available to the pipeline. These ranges represent exclusive ownership of the contained keys to those workers. This ensures that the workers to which they are assigned are exclusively mutating state associated with those keys. This is the necessary condition for high scalability achievable without fine-grained barriers or full transactions as discussed in the previous section.

The exclusivity of these assignments is enforced with coarse-grained barriers (leases) preventing writes from succeeding unless the worker owns a given range, and therefore enforcing exclusivity. Let’s take a look in detail on how this is enforced across the re-assignment of a range from one worker to another. A component of Streaming Engine called the “Controller” manages the metadata about the assignments of ranges to worker, as well as communicating with the persistent storage layer. It is necessary for the persistent storage layer to implement lease semantics which guarantee that only writes with valid leases are allowed to succeed.

In the sequence diagram above, we can see how the Controller actuates the movement of a particular range from Worker A to Worker B, and how delayed writes from Worker A are rejected at the persistent state layer. This ensures that Worker B will not have any read-modify-write races when it processes traffic for the re-assigned range of keys.

These partitions into key ranges also represent Streaming Engine’s unit of load balancing. Sub-partitioning or re-partitioning the range boundaries, and changing the workers to which each is assigned is the mechanism by which we can shift the load in response to various performance measurements in Streaming Engine. This is an open and ongoing area of research for Streaming Engine and I hope to cover it in a future post.

The Active Work Lifecycle

So far we’ve talked about what the processing graph looks like, what the keys are, and how we can partition and scale our processing. But we have not processed any actual data yet! What actually happens to messages that are flowing through Streaming Engine? Once again, it helps to ground our discussion in concrete reality. Let’s look at the worker that is assigned some range of keys for Stage 2, meaning it is on the receiving end of the shuffle for the GroupByKey operation grouping on tags. Let’s assume our worker has been assigned the range containing the (hashed) key Apple but not Banana. Then we will be receiving messages that look something like this:

<Key: Apple, Timestamp: 10/11/21 1:30:12 UTC, Data:{PubsubMessages}>
<Key: Apple, Timestamp: 10/11/21 1:31:48 UTC, Data:{PubsubMessages}>
<Key: Apple, Timestamp: 10/11/21 1:36:02 UTC, Data:{PubsubMessages}>

Let’s walk through what happens “conceptually,” and then discuss how it is actually implemented in Streaming Engine. At a high-level, the following sequence of events occurs:

  • For each message we determine if it is on-time or late. Watermarks are used to make this determination — to learn more about this Streaming Systems Chapter 3 is recommended reading.
  • To keep things simple, let’s talk about on-time messages. This means that it is not yet time to emit the total aggregate. We are still “collecting” partial aggregates and waiting for the window to close, before we can materialize the total result. In this case, the partial aggregate is appended to the state of the key.
  • When the window closes (as determined by the watermark) it is time to materialize the total aggregate. We read back the accumulated partial aggregate state, and compute and emit the total aggregate to the next stage of processing.

To go further and talk about the implementation details of the conceptual steps outlined above, we have to “zoom in” to the concept of worker. When we crack open this black box, there are actually two distinct kinds of workers within the “worker” concept, each doing different kinds of work.

The two workers communicate with each other via the “Windmill API” which allows a separation of responsibilities. User workers invoke the Windmill API to communicate with windmill workers, which consists of the following:

GetWork: Returns the input from the windmill worker on which the user-defined transforms will run. For example, a batch of incoming messages from the shuffle, or the timers that are ready to fire to trigger deferred aggregation.

GetData: Returns the state stored needed during the execution of a particular user-defined transform. Multiple types of storing state such as List State, Bag State, or Map State allow for efficient access into state without the need to always materialize the entire stored state. The Beam State API provides a direct way of interacting with this state. Concepts such as windowed aggregations also implicitly rely on this state and GatData.

CommitWork: Sends the result of applying the user-defined transforms from the user worker to the windmill worker. For example, the elements to be written to the shuffle for a downstream stage, or mutations to state.

Tip: For the curious, you can read the entire Windmill API definition here.

Notably absent from the Windmill API are concepts such as windows or triggers. These are implemented entirely in the User Worker using Windmill API’s lower-level semantics of state and timers. To help understand how that works, let’s go back to our conceptual description of what should happen in processing of messages for Stage 2, and trace through the execution in more detail with the two workers and the windmill API in mind.

In the above sequence we can understand the division of responsibilities. The windmill worker is predominantly responsible for queueing and persistence to durable state of messages, state, and timers. The User Worker is responsible for applying the user-supplied transformations as well as higher-level primitives such as windowing, and translating the results back to the Windmill API. The Windmill worker is backed by state, ensuring consistent processing, while the User Worker is stateless. We will dive into the details of exactly how this is achieved in the Correctness section below. But first, lets address a common pitfall of pipeline design:

The Effect of Long-Running or Stuck Operations

In most cases the time to apply UDFs is on the order of milliseconds. There are cases however when this time can be significantly longer — often when blocking calls to other services are involved. Such long-running operations can be a challenge. One question we sometimes hear is “why can’t we just ignore stuck operations and keep processing everything else?”

“Ignoring” a stuck operation and continuing to process all others is fundamentally not possible in dataflow, without additional work from the pipeline code.

All processing in streaming Dataflow is performed in the context of a key and message processing is serial per key for the reasons explained in the previous section. Therefore an operation blocking forever will ultimately block processing for all further messages arriving for that key. To preserve the exactly-once guarantees, Streaming Engine will never drop data, so the pending messages will continue to wait. Initially only the messages assigned to the same key will be affected. Eventually, the buffers will grow, and processing will be suspended all together once memory budgets and flow control limits at various levels are triggered.

So what can be done about this? The best option always is to figure out why the operation is taking forever and fix this. Often this involves re-thinking pipeline design and avoiding expensive or blocking operations, relying instead on built-in data sinks such as PubsubIO and BigQueryIO which are designed to be performant at scale. Custom sink implementations that scale are of course possible by a careful application of batching, timeouts in the right places, and bounding of retries.

Tip: The distribution histogram of time for the execution of the user’s transforms will be available as a metric launching soon. This will help indicate if the delays being experienced are reasonable, and if there are any long-running stragglers that might be throttling throughput.

Correctness: Exactly-Once Processing

For a broader and more thorough discussion of Correctness and Exactly-Once Processing, see Streaming Systems Chapter 5. Here, let’s focus on a few key points so that we can dive deeper into execution-specific details. When speaking of the “correctness” of processing, we can generally be referring to the following concepts:

  • Was each incoming message processed at least once? By this we mean that no messages were lost or dropped. At-least-once semantics do not preclude duplicates.
  • Was each incoming message processed exactly once? This implies that messages were processed at least once, and only once. Any replays of messages should be filtered out.

Streaming Engine always guarantees exactly-once processing with respect to any internal replays, and can guarantee exactly-once processing with external sources and sinks that provide sufficient semantics for this. Lets focus first on internal replays, and fault-tolerance.

What kinds of things can go wrong that could lead to dropped or duplicate messages? The most common types of failures are cases where workers can go away for some reason. This could be due to crashes, hardware failures, network failures, or any other infinite list of reasons that it is never possible to fully anticipate. Processing backed by durable state serves to fulfill the at least once guarantee. But guarding against replays is a bit trickier. Let’s zoom in to some details, focusing on what parts of the system can crash, and how Streaming Engine handles this. What happens when any of our User Workers or Windmill Workers crashes mid-execution?

In this first scenario, we look at the case when our User Worker is processing some messages it has received via GetWork, and then crashes. The above sequence shows how we replay the input to the worker (to ensure at least once), and how we use leases to guarantee that only one CommitWork response is allowed to land back on our Windmill Worker. Each instantiation of a user worker has a unique id, that is used to invalidate the lease. Because the User Worker is stateless, this is the easier case to handle. Next we are going to take a look at the more complex sequence that we use to handle a crashing Windmill Worker.

Tracing through the above sequence helps us understand how careful ordering of persistence to durable state on the Windmill Worker allows us to be resilient to replays: When a message is shuffled, we check the previously persisted message ids against the incoming message id to determine if the message is a duplicate. If it is not a duplicate we allow it to process. Once processing is complete, we persist the state mutations resulting from the call to commit, the message id of the message just processed, as well as any intended shuffle outputs. We perform the write to durable state atomically, guaranteeing that all stateful changes on behalf of the message are either committed together or not at all. Once persisting to durable state is complete, we can acknowledge the incoming shuffle back to the upstream worker as well as allow the downstream shuffle to start, changes written to durable state is now a guarantee that the message has been processed, and future attempts will be rejected. To see how this plays out, consider the following sequences where the worker crashes:

The first scenario to consider is if our windmill worker crashes sometime after the message is received on the incoming shuffle, but before persisting the results of processing to persistent state. In this case, the incoming shuffle request is never successfully acknowledged, and will be retried to the new instance of the worker. Since no write to durable state was completed, the messages will not be marked as duplicate, and will be allowed to continue to process.

The second scenario to consider is what happens if our windmill worker crashes after the write to durable state has already successfully completed, but before we have sent an acknowledgement upstream, or started the intended shuffles downstream. In that case, when our shuffle from upstream retries, we will use the information stored in durable state to determine that the incoming message is a duplicate and has already been processed. This will allow us to acknowledge the shuffle for it immediately as having already succeeded. What about our intended shuffles downstream? Well, we have those stored in state also, and as part of starting the new worker instance we will start issuing those shuffles to the downstream worker as well.

There are a few other variations on a theme here in the sequence of when a worker can crash, but all of them result in the same thing — a guarantee that the results of our processing will be written to durable state exactly once at each stage of the pipeline.

Tip: The number of messages filtered out as duplicates is available by examining the metric
‘dataflow.gooogleapis.com/job/duplicates_filtered_out_count’ as noted
here.

Careful readers will note now that this is all well and good, but there’s a devil in the details. What happens if, you may ask, the UDF sitting in between GetWork and CommitWork performs some non-idempotent action with an external system — like telling a bank to wire some funds? Well, in that case you will have potential replays since Streaming Engine does not inherently guard against this — the money may be wired twice or more. Thus when interacting with external systems outside of dataflow from within the pipeline code, careful consideration is required.

Most commonly this comes up when writing the outputs of your pipelines to an external system (often BigQuery, PubSub, and GCS). The good news is that in most cases the writes are idempotent (GCS), or the system in question provides a mechanism for taking advantage of the deduplication metadata such as message ids to extend the exactly-once guarantee past the edge of the dataflow pipeline (BigQuery, PubSub). This is an important detail for those integrating their pipelines with other data sinks — ensuring that the sink can correctly handle idempotent replays, or that it can take advantage of the available metadata to correctly de-duplicate.

Concluding Thoughts:

I have tried to walk a careful balance in this article, giving just enough technical depth about Streaming Engine, without overwhelming you with irrelevant implementation details. My hope in writing all of this is that the next time you go to write or tune a Streaming Dataflow job, you will know how to make it perform and scale on Streaming Engine. If you want me to go into additional depth on any of the topics I touched upon, please let me know in the comments.

--

--