Dockerizing Spark Structured Streaming with Kafka And LocalStack

Tal Wanish
Riskified Tech

--

If you have ever tried to develop a Spark application, you probably noticed that integrating your application with the “outside” world could cause some difficulties. Suddenly you got org.apache.spark.sql.AnalysisException or connection errors, either from reading the input or writing the output. Wouldn’t it be nice to run your application from within a local environment similar to the production environment?

This blog post will demonstrate how to integrate Kafka and S3 with Spark Structured Streaming using Docker Compose. As an example, we’ll create a simple Spark application that aggregates data from a Kafka topic and writes it to a Delta table on S3. Finally, we will write a basic integration test that will validate our app is working as expected.

Following the instructions in this tutorial, you should be able to accomplish these two goals:

  • Run integration tests in a Docker environment containing Spark, Kafka, and S3.
  • Have an isolated environment for local development that fully integrates the parties mentioned above.

Now, let’s follow this process step by step.
We’ll focus on integrating the different components rather than getting into deep technical details for each one of them. Additionally, you’ll find the complete source code in this repository for extended study. We’ll include references where further elaboration might be helpful.

Plan

The system runs in a dockerized environment, where the integration test communicates with the system from the “outside”:

Ideally, we want to run these tests with minimal effort within a CI context or locally.

We split the process into three steps:

  • Coding the Spark application along with its integration test.
  • Dockerizing the application.
  • Dockerizing its dependencies (Kafka and LocalStack).

We assume some familiarity with Spark Structured Streaming API, Docker, Scala, and sbt.

Building a Sample Application

Our first step is creating a simple Spark application. We will use the Word Count Example from the Spark official documentation, with a slight modification: Instead of reading from a socket, we will read from a Kafka topic. Furthermore, instead of writing to the console, we will write to a Delta Table on S3.

Let’s assume we have an initialized spark session and a known configuration for connecting to Kafka (we will cover these later, so don’t worry). Our first step will be reading the lines from the topic:

Next, we should split the given lines into words and count the occurrence of each word:

Now that each row contains a word with its corresponding count, we can stream the data to S3. Again, assuming we’ve given an S3 bucket and a path to the table:

I recommend reading the official docs for more details on the semantics of streaming to and from Delta tables.

Now, we’ve implemented our business logic. What’s left is to configure SparkSession to communicate with S3 as provided by LocalStack:

Please notice that masterUrl points to local[*]. It’s worth emphasizing that this configuration should be pulled dynamically by the runtime environment rather than being hardcoded. But, for the sake of simplicity, we’ll set it statically and pass the values via environment variables:

That’s it. We have an application that consumes lines from Kafka, counts the occurrence of each word, and writes it to a Delta table on S3.

Writing the Integration Test

Our next step is to write a simple integration test that sends lines to Kafka and validates the expected count by reading from the Delta table. Let’s start with building a DataFrame consisting of two lines:

To comply with Kafka’s message format, each line is represented in a column named value along with a randomly generated key column. For more information, please visit Spark Kafka Integration docs.

Next, we write this DataFrame to Kafka. As opposed to our application, here we write the DataFrame as a batch query:

Finally, we read from the output table and assert the result:

We first load the table, filter the words “fish” and “way” and collect them in memory as a Scala map from each word to its count. Notice we wrap it in an eventually clause because we want to give the application a chance to process the data.

Our integration test is almost done.
Since our test will be running on localhost and our application in a Docker environment, we should configure our SparkSession used from within the test as follows:

It’s identical to the definition of the SparkSession on the application, only that here the addresses refer to localhost. In contrast, the application refers to Docker internal services names (e.g. kafka:29092 instead of localhost:29092).

Dockerizing Our Application

We’ve written an application and an integration test that asserts its functionality. The next step is packaging the application to make it executable in a Docker environment. We will use sbt-native-packager to package the project and create its Docker image. To run our Spark application, we need a runtime environment that includes, unsurprisingly, Spark distribution.

Let’s first create a base Docker image that contains Spark. Visit the official download page, download the Spark 3.3.1 release, and choose “Pre-built for Apache Hadoop 3.3 and later (Scala 2.13)”. After downloading the .tgz file, we’ll unpack it and cd into the directory. At the root of this directory, we build the distribution:

docker build -t spark:3.3.1-hadoop3-scala2.13 -f kubernetes/dockerfiles/spark/Dockerfile .

After this command, we have a base Docker image equipped with Spark distribution. Now, we can create the Docker image of the application itself:

If you run sbt Docker / stage, you’ll find the generated docker file located at target/docker/stage/Dockerfile with the above commands:

We use the Docker image with the Spark distribution as a base image and add our application jars. To make life easier, let’s write a simple script at the root of the project to publish the image to the local registry:

create-local-docker.sh

We won’t go through the rest of the build.sbt file as it’s pretty straightforward. You are welcome to inspect the entire file with its corresponding dependencies. In addition, we recommend reading the sbt way of defining a project with integration tests.

Dockerizing Dependencies — Kafka

Let’s create a Docker Compose file docker-compose.infra.yml at the root of it package with Kafka, ZooKeeper, and kafdrop UI for visibility:

Lots of small details, but again, there are many materials out there, here’s one example. After line 43, the Kafka broker is up and running. The only thing left is creating the topic from which our application consumes its lines. The step init-kakfa is for that exactly: it mounts the script directory at the root of it and executes the following script:

Dockerizing Dependencies — LocalStack

Next, we should add a localstack service to our docker-compose.infra.yml file:

Besides spinning up localstack, we add the initialization step init-s3 as we did previously, to create the bucket where our Delta table is located. This step also runs a simple script that creates the bucket:

After this step, our it directory should have the following structure:

├── it│   ├── docker-compose.infra.yml│   ├── docker-compose.yml│   ├── script│   │   ├── setup-kafka.sh│   │   └── setup-s3.sh│   ├── src│   │   └── it│   │       └── scala│   │           ├── SparkSpec.scala│   │           └── WordCountsSpec.scala

We are getting close to a fully operating environment!

Composing All Together

Finally, we can add our application to the Docker environment. We will create a separate docker-compose.yml file, to allow us the flexibility to run the dependencies (Kafka and LocalStack) dockerized and the application itself as a standalone (on the IDE, for example):

Besides the configuration that is passed via the environment variables, notice we used spark-submit utility as an entry point to run our Spark application. For a comprehensive description of spark-submit, please visit the official docs.

Final Stage: Running the Test

We now have everything we need to run our test. Let’s stitch up one last script to spin up the environment:

And one to shut it down:

And that’s it, we’ve reached our final destination.
We can run our tests with three simple steps from the root of the project:

  • Create the last version of the application code: ./create-local-docker.sh
  • Spin up the Docker environment: ./it/up.sh
  • Run the tests: sbt "project it; IntegrationTest / test"

It’s a perfect opportunity to take a moment and gaze at the green:

That was a long road! I hope that you found it rewarding.

As mentioned earlier, all of the code is available at https://github.com/wanishing/dockerized-spark-structured-streaming.

--

--

Riskified Tech
Riskified Tech

Published in Riskified Tech

Software Engineering, Research, Data, Architecture, Scaling and more, written by our very own engineers and data scientists.

Tal Wanish
Tal Wanish

Written by Tal Wanish

Software Developer at Riskified