Streaming pipelines with Scala and Kafka on Google Cloud Platform

Google Cloud Platform (GCP) offers many options to run streaming analytics pipelines. The cornerstone message queues processing system in GCP is Cloud Pub/Sub. However, Kafka is probably the most popular alternative to Pub/Sub, and there is not so much info out there about how to integrate with other GCP products.
For stream processing, Dataflow is one of the most advanced systems out there. To run a pipeline on Dataflow, we have to use Apache Beam, an open source SDK which can run on top of many different systems, such as Apache Flink, Apache Spark, and many others, not only Dataflow. However, at the moment of writing this, the Beam SDK is only available in Java, Python, Go and SQL, and I deeply enjoy Scala and functional programming.
When writing data processing systems, I think the functional programming paradigm fits like a glove to the concepts of streams processing. Fortunately for those who enjoy Scala like myself, Spotify develops Scio, a Scala API for Apache Beam. Although based on top of Beam and therefore available for any Beam runner, Scio is more focused on Dataflow; this makes sense, as the native APIs for other typical processing systems (Spark, Flink) are already in Scala. In fact, the Scio API is inspired by Spark, with some ideas from Scalding.
It comes with many extra goodies too. I particularly like the type-safe API for BigQuery, which allows to map any BigQuery table to a case class in Scala, inferring automatically the schema from BigQuery — that is, you don't have to write any field for your case class representing your data model in BigQuery (!).
The example
In this post, we are going to use this example from the Google Cloud Professional Services team Github repo: https://github.com/GoogleCloudPlatform/professional-services/tree/master/examples/dataflow-scala-kafka2avro
In the example, we will setup two Dataflow pipelines: one writing objects to Kafka, and the other one reading those objects from the same Kafka topic. The pipelines are written in Scala, using Scio.
Here I assume that you have a Google Cloud Platform project ready. You can follow this example using the free GCP credit if you have never used GCP before.
Setting up a Kafka server
The first thing that we need to do is setting up a Kafka server. Normally, you would run Kafka as a cluster, but for this example, it will be enough with a single standard virtual machine (VM).
To deploy Kafka in a VM, see this previous post. For your VM, you will have to choose a region, try to choose closest region to you that also has a Dataflow endpoint.
In my case, my region is europe-west1, and I will deploy my Kafka VM in the zone europe-west1-d. When you have your Kafka VM deployed, come back here to continue with this tutorial.
Setting up a Virtual Machine to compile and trigger the pipelines
Although we could use the Cloud Shell to attempt compiling the pipelines, the Cloud Shell is not meant to do heavy weight computation, and it is more a convenience shell than a VM to support our computations.
So to compile the code of the pipelines, and to trigger them in Dataflow, we are going to create a small VM. If you still have the test-vm up and running from a previous post, remove it, as we will need to add an extra option.
Let's create a build-vmand let's make sure that it is in the same zone as our small Kafka server. We need to make sure that the scopes of this VM allows it to access to all the Google Cloud APIs (or it will not be able to trigger the Dataflow pipelines):
$ export ZONE=europe-west1-d
$ gcloud compute instances create build-vm --zone=$ZONE --machine-type=g1-small --scopes=https://www.googleapis.com/auth/cloud-platform
Created [https://www.googleapis.com/compute...
NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP ...
build-vm europe-west1-d g1-small 10.132.0.4 ....Clone the Google Cloud Professional Services repo
The code for our example is in the Google Cloud Platform Professional Services Github repo. In that repo, we include all kinds of examples and utilities for your work (and ours!) on Google Cloud Platform.
In this case, we are going to use this example: https://github.com/GoogleCloudPlatform/professional-services/tree/master/examples/dataflow-scala-kafka2avro
In that directory, you will find two Dataflow pipelines, written in Scala.
One of the pipelines is a producer, and it will populate Kafka with some messages. The messages are in fact serialized objects (instances of a case class), that are transformed into base64-encoded strings. So we don't only pass string messages, we are passing whole objects through a Kafka topic!
The other pipeline is a consumer, that will listen to updates in the same Kafka topic, will recover the objects, transform them into Avro format, and will copy the objects to Google Cloud Storage. So in some sense, this consumer pipeline is making a backup of all the objects that are sent to that topic.
Let's start by cloning the Github repo, and changing to the directory of the example. First we need to ssh into the build-vm and install some missing dependencies:
$ gcloud compute ssh build-vm --zone=$ZONE
build-vm$ sudo apt install -y git apt-transport-https kafkacat default-jdk
[...]build-vm$ git clone https://github.com/GoogleCloudPlatform/professional-services.git
Cloning into 'professional-services'...
remote: Enumerating objects: 48766, done.
remote: Total 48766 (delta 0), reused 0 (delta 0), pack-reused 48766
Receiving objects: 100% (48766/48766), 189.70 MiB | 25.66 MiB/s, done.
Resolving deltas: 100% (14245/14245), done.
Configuration file
The options passed to the pipelines are set through a configuration file. This configuration file is packed along with the compiled code in the generated JAR package.
One of the options requires having a bucket in Google Cloud Storage (GCS), where the Avro files will be stored. So let’s create a bucket first. We will create a regional bucket, in the same region as our Kafka server, to avoid to cross regions to send data from Dataflow or Kafka to GCS. Choose the same region you have used above (europe-west1 in my case). The name of the bucket must be unique across all possible buckets in all the GCP projects. So I normally choose my project id or project name (sometimes the project name might not be unique). In this case, this is the bucket I have created (you can run this command in the build VM or in the Cloud Shell):
$ gsutil mb -l europe-west1 gs://ihr-kafka-dataflow
Creating gs://ihr-kafka-dataflow/...The configuration file is located insrc/main/resources/application.confin the directory of our example. The content of that file in the repo is:
broker = "YOUR_IP_HERE:PORT"
dest-bucket = "BUCKET_NAME_NOT_STARTING_WITH_GS://"
dest-path = "PATH_INSIDE_BUCKET"
kafka-topic = "TOPIC_NAME"
num-demo-objects = 500 # number of messages to be generated...The broker IP is the internal address of kafka-vm , created in a previous section (if you followed the instructions given in a previous post, your Kafka server's name is kafka-vm ), and the port is 9092. You can find out the internal IP address of your server with (run it from either the build VM or the Cloud Shell):
$ gcloud compute instances list
NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP ...
kafka-vm eu... n1-standard-1 10.132.0.3We will copy the files into a subdirectory called kafka2avro and the topic name is going to be called avro_objs .
So in the end, this the content of my configuration file:
broker = "10.132.0.3:9092"
dest-bucket = "ihr-kafka-dataflow"
dest-path = "kafka2avro"
kafka-topic = "avro_objs"
num-demo-objects = 500To edit the file and set the options for the pipeline, change to the directory with the sources in the repo, and use the editor nano to edit the file:
build-vm$ cd
build-vm$ cd professional-services/examples/dataflow-scala-kafka2avro/
build-vm$ nano src/main/resources/application.confWhen you are done editing the file, press Ctrl + X, reply "Y" (for Yes), and press enter to write the file and exit the editor. You are now ready to compile the code and generate a package:
Generate a JAR package
To launch the Dataflow pipelines, we need to compile the code and generate a Java package. Also, the configuration file is included with the generated package, so any change to the configuration requires to compile the package again.
Being a Scala project, the build tool that we are going to use is sbt (Scala Build Tool) . Let's install it in our build VM. Alternatively, you can skip these steps if you have sbt installed locally. We are installing sbt to be able to generate a JAR package and launch the Dataflow pipelines.
Following the instructions from the documentation, let's execute some commands in the build vm.
First add the repository for sbt to the list of repos in build-vm:
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.listImport the key to check the package signature:
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key addNow update your list of packages
sudo apt updateAnd let's install sbt
sudo apt install -y sbtOnce it is installed we can just call sbt to compile and generate the package:
build-vm$ cd
build-vm$ cd professional-services/examples/dataflow-scala-kafka2avro/
build-vm$ sbt
[..]
sbt:kafka2avro> compile
[..]
sbt:kafka2avro> pack
[..]
sbt:kafka2avro> exit
[..]With that, we have generated a package to launch the pipelines.
We have used a small VM to compile the code and we will use it to launch the pipelines too. To avoid having to run a VM just for that, you probably will want to use service like Cloud Build. In fact, you have a Cloud Build configuration file in the repo for this pipeline. But we will not use it in this post.
Launching the consumer pipeline
The consumer pipeline will only read messages that are published to the Kafka topic after the consumer pipeline has started. For this reason, we will start it before the producer pipeline.
To launch the pipeline, we need to set a staging location for packages and temporary files, and we need to setup a local classpath to find all the necessary dependencies.
Also, we need to make sure that the Dataflow API is enabled in our project (use that link to enable the necessary APIs in your project).
As for the credentials, we will rely on the gcloud CLI utility to handle the authentication. You could also create a service account for your Dataflow pipelines, and use those credentials to authenticate.
All the required libraries are stored in the directory target/pack/lib/ and the staging location will be a directory in our bucket (e.g. I will use gs://ihr-kafka-dataflow-stg ).
Now, there is one last requirement: we need to have connectivity to the Kafka server to launch the pipeline. The submit process grabs some metadata from Kafka before launching the pipeline. Because we are running a VM in the same zone as the Kafka server, that will not be an issue. But if you run the pipeline from a different machine or service, you should take that into account to add firewall rules and the right permissions.
In the same directory as in the previous section, let's now trigger the consumer pipeline:
build-vm$ export STAGING_LOCATION=gs://ihr-kafka-dataflow/stg
build-vm$ export PROJECT_ID=ihr-kafka-dataflow
build-vm$ export CLASSPATH="target/pack/lib/*"
build-vm$ export REGION=europe-west1
build-vm$ java com.google.cloud.pso.kafka2avro.Kafka2Avro --exec.mainClass=com.google.cloud.pso.kafka2avro.Kafka2Avro --project=$PROJECT_ID --stagingLocation=$STAGING_LOCATION --runner=DataflowRunner --region=$REGIONOnce the pipeline is running, as soon as the Kafka topic starts receiving messages, it will start producing Avro files in Google Cloud Storage.
After launching the pipeline, we can check that the pipeline is running in the job page in Dataflow UI. We should see a graph like the following:
Launching the producer pipeline
The consumer pipeline will not start processing messages unless we publish them in Kafka. So let's launch it too. We use the same environment variables as in the previous pipeline:
build-vm$ java com.google.cloud.pso.kafka2avro.Object2Kafka --exec.mainClass=com.google.cloud.pso.kafka2avro.Object2Kafka --project=$PROJECT_ID --stagingLocation=$STAGING_LOCATION --runner=DataflowRunner --region=$REGIONAfter launching the pipeline, we can check that the pipeline is running in the job page in Dataflow UI:
To check that it has actually published 500 objects to the Kafka topic (in the configuration above:avro_objs ), let's connect check with kafkacat, a command line client for Kafka:
build-vm:~$ export BROKER=10.132.0.3
build-vm:~$ export TOPIC=avro_objs
build-vm:~$ kafkacat -b $BROKER:9092 -t $TOPIC -e | wc -l
% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic avro_objs [0] at offset 500: exiting
500The consumer pipeline should have started to process messages, and it should have produced an Avro file, with the 500 objects grouped as a list. We can check the output location:
$ export DEST_BUCKET="ihr-kafka-dataflow"
$ export DEST_PATH="kafka2avro"
$ gsutil ls -hl gs://${DEST_BUCKET}/${DEST_PATH}
22.18 KiB 2020-01-27T16:09:35Z gs://ihr...0930.avro
TOTAL: 1 objects, 22713 bytes (22.18 KiB)So both pipelines are working fine!
Cleaning up
Note that the consumer pipeline is a streaming pipeline, so it will be running (and incurring in additional cost) until you stop it. To stop the pipeline: list the jobs you are running, take note of the job id, and cancel it (change the region if you are using a different one):
$ export REGION=europe-west1
$ gcloud dataflow jobs list --status=active --region=$REGION
JOB_ID NAME ...
2020-01-27_08_32_00-13108354405313287331 kafka2avro-...
$ gcloud dataflow jobs cancel 2020-01-27_08_32_00-13108354405313287331 --region=$REGION
Cancelled job [2020-01-27_08_32_00-13108354405313287331]The producer pipeline is a batch pipeline, so it finished as soon as it created 500 objects.
You may also want to stop and/or delete the build VM. If you let it running, it will incur in cost even if you are not using it. Run this from the Cloud Shell: (beware: irreversible removal)
gcloud compute instances delete build-vm — zone=$ZONESimilarly, remember that the kafka-vm is still running. You may want to stop it and/or delete it.
Also, remember that you have created a GCS bucket (check its name using gsutil lsin the Cloud Shell) to remove it run in the Cloud Shell:(beware: irreversible removal)
gsutil -m rm -rf gs://[BUCKET_NAME]Serializing any kind of custom object
The pipelines work by serializing and deserializing objects, that are encoded as base64 strings in the Kafka topic, and that are represented by a Scala case class in the code:
The code can work with any Scala case class. The only restriction is that this class must be known at compile time. The type is defined as T at the beginning of the pipelines. You need to supply two functions, to serialize:
and deserialize the objects:
Notice that in this case, both functions are type-agnostic, so you can apply them to any case class, or any other type, as long as it is serializable.
Reading old messages with the consumer pipeline
We have launched the consumer pipeline first, otherwise it would not be picking up the messages. That's because messages were sent with a timestamp prior to the creation of the consumer pipeline.
How can we fix this problem? How can we make sure we read all the unacknowledged messages contained in the topic?
For that, we need to know the timestamp of a moment older than the timestamps of the messages we want to read, and set the withStartReadTime property in KafkaIO.Read (the original code in the Github repo does not specify any start read time):
In this case, we are reading all the messages that are older than epoch 1, that is, older than a second after 1970–01–01.
Wrap up
In this post we have seen how we can use Scala and Kafka for streaming data processing pipelines, using Scio and Dataflow on Google Cloud Platform.
- You can use Scala to create streaming and batch pipelines to run on Dataflow, and interact with many services (GCP natives or open source, such as Kafka)
- We show an example serializing and desarializing objects, without further modifications, but you can apply any transformation to your data adapting this code and just changing a couple of functions.
- If you have a consumer pipeline that needs to recover all messages from a Kafka topic, we have seen how to read any message, no matter how old, as long as it is still stored in the Kafka topic.
- You can follow this tutorial using the free version of GCP.

