Streaming a Kafka topic in a Delta table on S3 using Spark Structured Streaming

Kees C. Bakker
Nov 9, 2019 · 6 min read

At Wehkamp we use Apache Kafka in our event driven service architecture. It handles high loads of messages really well. We use Apache Spark to run analysis and machine learning.

When I work with Kafka, the words of Mark van Gool, one of our data architects, always echo in my head: “Kafka should not be used as a data store!” It is really tempting for me to do so, but most of the event topics have a small retention period. Our data strategy specifies that we should store data on S3 for further processing. Raw S3 data is not the best way of dealing with data on Spark, though. In this blog I’ll show how you can use Spark Structured Streaming to write JSON records on a Kafka topic into a Delta table.

Note: This article assumes that you’re dealing with a JSON topic without a schema. It also assumes that the buckets are mounted to the file system, so we can read and write to them directly (without the need for boto3). Also: I’m using Databricks, so some parts are Databricks-specific.

Design

To make things easier to understand, I’ve made a diagram of the setup we’re trying to create. Let’s assume we have 2 topics that we need to turn into Delta tables. We have another notebook that consumes those delta tables.

The notebook and Kafka are decoupled by the Delta tables.

Each topic will get its own Delta table in its own bucket. The topics are read by parametrised jobs that will use Spark Structured Streaming to stream updates into the table. The update jobs can run every hour or continuously, depending on your needs. The job will save the Kafka group id, so it will read every message only once.

The notebook that needs the topics, connects to the delta table and consumes the data. This way the notebook becomes decoupled from Kafka.

Steps

We need to do the following steps:

Let’s start!

1. Configuration

We will build a generic notebook, so we must add some widgets to influence the way it runs:

The widgets look like this:

Widgets are shown on top of the notebook. Unfortunately, widgets are shown in alphabetical order.

Global variables

Let’s start out with a cell with global variables that will not be parametrised.

At Wehkamp we use prefixes for buckets.

Widgets

Let’s set up the widgets:

Parse widgets

Now that we have our widgets, we should parse it to variables. I’ve lifted some code from this blog to help me get the values from the widgets:

We will convert the widgets values to variables:

Locations by convention

To write the Delta table, we need 3 settings: the location of the delta table, the location of the checkpoints and the location of the schema file. We will use a convention to get these locations, based on the name of the topic:

2. Schema

The Kafka topic contains JSON. To properly read this data into Spark, we must provide a schema. To make things faster, we’ll infer the schema only once and save it to an S3 location. Upon future runs we’ll reuse the schema.

Schema inference

Before we can read the Kafka topic in a streaming way, we must infer the schema. We’re using code from this blog to infer the schema. Let’s start off with some imports:

Now let’s define a method to infer the schema of a Kafka topic and return it in the JSON format:

Inferring a schema might take a while, as Spark has to read the entire topic to determine the schema. That’s why we should cache it in the S3 bucket, so we only have to infer the schema once.

Note: I’m using dbutils.fs because writing a file with file.write will write the file to the driver, but not to S3.

Load schema

Loading the JSON from S3 into a schema is super simple:

3. Read Kafka Stream

Now we can finally start to use Spark Structured Streaming to read the Kafka topic. The function we’ll use looks a lot like the infer_topic_schema_json function. The main difference is the usage of readStream that will use structured streaming.

We can read our topic into a dataframe:

4. Delta table

Now that we have a (streaming) dataframe of our Kafka topic, we need to write that dataframe to a Delta table.

Ensure the Delta table

First, we need to make sure the Delta table is present. Here is where we can use the schema of the dataframe to make an empty dataframe. This dataframe can create an empty Delta table if it does not exist.

Upsert by Kafka key

The kafka_key is a unique identifier for each record. We can use the key to update the data in the Delta table. We'll use this script to make that upsert happen:

We want to update or insert all the columns of our dataframe into the Delta table, so we are using whenNotMatchedInsertAll and whenMatchedUpdateAll. More info can be found in the documentation of the DeltaMergeBuilder.

Write stream data

Now that we have everything in place, we can write to our delta table:

5. Reading a Delta table

Reading a Delta table is a piece of pie:

Final thoughts

I’ve shown one way of using Spark Structured Streaming to update a Delta table on S3. The combination of Databricks, S3 and Kafka makes for a high performance setup. But the real advantage is not in just serializing topics into the Delta Lake, but combining sources to create new Delta tables that are updated on the fly and provide relevant data to your domain.

We’ve seen an uplift in the performance of scripts that used to query Kafka themselves (some had an uplift of 25%). The delta table is faster and makes code easier to read

A big shout-out to Jesse Bouwman (Junior Data Scientist) and Martinus Slomp (Senior .NET Developer) for the collaboration on this code.

Further reading

While working on this topic I found some excellent sources for reading:


I work as a Lead Developer at Wehkamp.nl, one of the biggest e-commerce companies of the Netherlands. This article is part of our Tech Blog, check it out & subscribe. Looking for a great job? Check our job offers or drop me a line on LinkedIn.

Originally published at https://keestalkstech.com on November 9, 2019.

wehkamp-techblog

We'll try to keep up and post on the stuff we're doing and discovering. Interesting in working @wehkamp? Check out https://werkenbij.wehkamp.nl/

Kees C. Bakker

Written by

I work as a Lead Developer for one of the biggest web-shops in the Netherlands: wehkamp. I ❤️ C# and I like to solve nifty problems.

wehkamp-techblog

We'll try to keep up and post on the stuff we're doing and discovering. Interesting in working @wehkamp? Check out https://werkenbij.wehkamp.nl/

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade