Kafka to Spark Structured Streaming, with Exactly-Once Semantics

Dennis de Weerdt
DataPebbles
Published in
5 min readNov 2, 2020

Apache Spark Structured Streaming is a part of the Spark Dataset API. This is an improvement from the DStream-based Spark Streaming, which used the older RDD-based API instead. It aims to provide low-latency and fault-tolerant exactly-once processing. That’s quite an ambitious goal, but with a few caveats, they’ve managed it. Apache Kafka also boasts those same claims, so you might expect that using the two frameworks together would preserve them. Unfortunately, that’s where those caveats come in. So, let’s take a look at what Structured Streaming is exactly, how to use it, and what kind of fault tolerance semantics to expect when using it alongside Kafka.

Structured Streaming Walkthrough

As an example, we’ll be using data on mobile phone activity in Milan in November 2013, retrieved from Kaggle. We won’t be doing anything particularly fancy with it, however. We’ll just take the data on mobile internet usage to compute the total hourly usage over the week the data covers. The data is static in this case, of course, but the exact same code would work for live data as well, so it serves as a good illustration. Our demo application will read incoming data from Kafka, process it, and then write the results to files.

The first step is to set up the data source in Spark. This looks just like it would if we were creating a standard DataFrame, with the exception that we use the “readStream” method rather than “read”. There are more differences under the hood, however. Most importantly, a different set of data sources is available for streaming. Notably, JDBC input (our output, for that matter) is not supported. Another difference which is very relevant to our example is that a Kafka source is provided out of the box, perhaps unsurprisingly. It’s easy to set up for basic usage, but also provides plenty of customisation options. It’s important to observe that input from Kafka comes with a fixed schema. It contains some Kafka metadata, a key/value pair, and any user-defined headers included in the messages.

After having created the streaming DataFrame, the data processing part works just like it normally would. There are a few operations which are not supported, but most are. To get aggregation to work, however, there is an extra step which needs to be taken. This has everything to do with the fact that the events Kafka delivers may not be in chronological order. For example, if we were gathering data from multiple sources, one of them may be on a slower network connection than the others. That would mean events which occurred at the same time might arrive some time apart. (Of course, there are any number of reasons part of the data may be delayed — this is just one example.) This issue can be mitigated by using watermarking. It’s a pretty simple concept: You indicate which column contains the “event time” and a threshold for, essentially, how long you’re willing to wait for late date to arrive. Then, when performing an aggregation, Spark will keep the intermediate state around until the watermark expires. Any data which arrives afterwards is discarded. It’s not really exactly-once anymore at that point, but you do have to draw a line somewhere.

Now it’s time to write the results. Similarly to before, the “write” method is replaced with “writeStream”. There are more significant differences this time, however. Instead of calling a method like “save” (or the format-specific shortcuts which use it), for streaming we use “start”. As the name of that method hints at, this starts an asynchronous process rather than just blocking until the job is done (as “save” does). That process will continue until a call to the “stop” method, or until it fails with an exception. In the example, we use “awaitTermination” to block the main thread and wait for the streaming job to do its work.

Fault Tolerance: When it hits the fan

So now that we have some idea of how things are supposed to work, let's take a look at what happens when things go wrong.

In order to provide fault tolerance, Structured Streaming uses checkpoints and write-ahead logs. These systems provide a way for the engine to record its progress in a resilient way, so if some kind of system failure were to occur then it could simply pick up where it left off once the failure gets fixed.

Checkpointing and write-ahead logs are not a silver bullet however, and there are some caveats to achieving exactly-once semantics. First, the data source(s) need to be replayable. Simply put, Spark must be able to fetch the exact same data from the same location if it needs to. Kafka supports this by design using its offset system, but the generic socket-based input, for example, does not.

The second condition is that the data sink(s) need to be idempotent. That basically means that, if the same data is written to the sink multiple times, the effect will be the same each time. The only sink Apache provides which meets that condition is the file sink. Notably, the Kafka sink does not. Consider a scenario where half the data for a job is processed, a fault occurs, and when the job restarts part of the work is repeated. With the file output, the files involved will just get overwritten, so there are no lasting effects from the failure. With Kafka, however, there is no way to mark the 'duplicated' messages as being in some way invalid. You might be able to get around this if you add a unique key to each output message and have the consumers of the messages check those keys, but in general the Kafka sink only supports at-least-once fault tolerance.

Putting this together, it is possible to achieve exactly-once semantics using Structured Streaming and Kafka, but only when reading from Kafka and writing the results to files. Writing to Kafka may create duplicate results, but at least you're guaranteed not to lose any data. Also, be sure to check the Kafka documentation for an explanation of its fault tolerance semantics. After all, if your Kafka is not set up correctly for fault tolerance, nothing you do in Spark is going to help with that.

--

--