Source and Sink for Kafka and PubSub

Justin Grayston
Musings In The Clouds

--

With the announcement of the Google Cloud Confluent managed Kafka offering, it has never been easier to utilise Google Cloud’s great data tools with Kafka. You may choose to use the Apache Beam Kafka.io connector to go straight into Dataflow, but this may not always be the right solution. Whether your Kafka is in the Cloud or on premise, you may just want to push to a sub set of PubSub topics. Why? Well you may the flexibility of having PubSub be your GCP event notifier. Then, you could not only choreograph Dataflow jobs, but also trigger Cloud Functions off of the topics.

So how do you get messages from Kafka to PubSub? This is where the PubSub Kafka Connector comes in handy. In this article I’ll walk through the basic steps to get this working on Google Cloud.

The connector itself allows two way event traffic, Kafka to PubSub and PubSub to Kafka. This could be really handy if you have Kafka and want to connect with applications and data running in the cloud or even with devices via IoT Core.

To run the example I’ll use Cloud Launcher to launch a single instance of Kafka, the go through the steps so we have event messages flying between the two. It is likely that you will be running a cluster, but for this I am going to keep things simple (and cheaper).

Before we begin

You will need a Google Cloud project with either the trial allowance or billing set up. Running this example shouldn’t cost too much (hourly rate in the launcher currently states around $0.043 at time of writing), but you will need to remember to tear it down afterwards!

Spin up your Kafka Instance

Go to Cloud Launcher and search for Kafka. In this example I just use the Google provided Kafka, but feel free to us Bitnami if you wish.

Click on the tile with Kafka on, then the blue Launch On Compute Engine button.

You should see something like this.

Cloud Launcher — Kafka Deployment

I just changed the region so it was closer to me. When ready, simply click “Deploy” button. This will then take you to Deployment Manager as it spins and configures the VM. It shouldn’t take long. When done you can click SSH to connect to the VM. When you come back to the console, the VM can be found under Deployment Manager and under Compute Engine.

You will find all the Kafka scripts in /opt/kafka/bin and you will also notice the config directory /opt/kafka/config this is where all your config properties are set.

First off, lets create a directory for us to place connect plugins.

sudo mkdir connectors

Compile the connector

Now you could do this on the VM if you want to add a bunch of dependencies, but I wouldn’t do that. Let’s keep our Kafka clean. I did this on my Mac, but you could easily do this in cloud shell. You can start a cloud shell session by clicking on the cmd prompt icon at the top of the Google Cloud Console UI.

Clone the Github repo https://github.com/GoogleCloudPlatform/pubsub.git

Then navigate into the kafka-connector directory.

Note that if working on a Mac you will need to update the io.netty classifier from ${os.detected.classifier}in pom.xml tolinux-x86_64. This is only required if you are compiling the connector on a different OS than the target host. In this case, compiling on macOS and running on a Linux VM.

mvn package

In the target directory you should now have cps-kafka-counnector.jar, this now needs copying to the VM. You can easily do this with the gcloud command.

gcloud compute scp ./target/cps-kafka-connector.jar  kafka-1-vm:~/ --zone europe-west3-c

Obviously change the zone to the one your are using. Then in the SSH session window, simply move that to /opt/kafka/connectors

sudo mv ./cps-kafka-connector.jar /opt/kafka/connectors/

Adding the connector configs

Before we start the connect service, lets get our config files in place. In the PubSub repo kafka-connector directory you will see draft configs. We are going to place this in the config directory of our Kafka VM and edit them so they know which topics to connect too. You can find pre-created configs with the topics we are going to use later here.

On the VM make sure you are in /opt/kafka/config/ then create the sink config.

sudo vi cps-sink-connector.properties

Then paste the config, either the vanilla version or the pre-completed one from the gist.

The name property must be unique, this can allow you to to set up multiple bindings to many PubSub topics. The rest of the configs are nicely explained in the docs. For this sink we are going to set topics=to-pubsub and cps.topic=from-kafka and of course set your Google Cloud project you are using in cps.project

Now we can do the same for the source connector.

sudo vi cps-source-connector.properties

As with sink, set the cps.project to your Google Cloud project. Then we will set csp.subscription=to-kafka-sub this tells the connector which PubSub subscription to subscribe to and then push into the Kafka topic set by kafka.topic=from-pubsub

We are nearly there, but first we need to set up the topics and subscriptions.

Set up Topics and Subscriptions in PubSub

For this we will use gcloud command line tool, but you can do all this via the console UI too.

Create the pubsub topics.

gcloud pubsub topics create to-kafka from-kafka

You can verify they are set up.

gcloud pubsub topics list

Now we need to create the subscription for the to-kafka topic.

gcloud pubsub subscriptions create to-kafka-sub --topic=to-kafka --topic-project=[CHANGE TO YOUR PROJECT]

Again, you can verify it is set up.

gcloud pubsub subscriptions list

Create the Kafka topics

Create the topic that will send to PubSub

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic to-pubsub

Create the topic that will receive from PubSub

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic from-pubsub

Created the Service Count and Provide credentials

gcloud iam service-accounts create kafka-pubsub

You have now created a service account. Now we need to set the role of that account so it can use PubSub.

You can now simply find this user in the Cloud Console IAM UI and assign PubSub admin role or do the following.

gcloud projects get-iam-policy [CHANGE TO YOUR PROJECTID]  --format json > iam.json

Then in the the bindings array add your service account with the role of PubSub admin to the iam.json

{
"members": "serviceAccount:kafka-pubsub@[YOUR SERVICE ACCOUNT ADDRESS]",
"role": "roles/pubsub.admin"
},

Then you can use the following to set the role for the service account.

gcloud projects set-iam-policy [CHANGE TO YOUR PROJECTID] iam.json

Note: The following is for example purposes only and is simulating a VM outside of Google Cloud. Do not use for production. Also, don’t forget that if you are running on Google Cloud, you can use the VM’s service account which means no download of secrets.

Our Kafka VM now needs to have the credentials so it can connect to PubSub.

gcloud iam service-accounts keys create google-pubsub-key.json --iam-account=[SERVICE ACCOUNT EMAIL]

Then copy these credentials to the VM

gcloud compute scp ./google-pubsub-key.json kafka-1-vm:~/google-pubsub-key.json --zone=europe-west3-c

This will now be in your home director. I just then created a bash script to start the connect service. This is in the gist as the run-connect.sh file.

Just place that in the home directory.

sudo vi run-connect.sh

Paste the contents from the gist and edit the path to the credentials file.

Allow the file to be executed.

sudo chmod +X run-connect.sh

Start the connect service

On the Kafka VM, the Kafka service itself should already be running as part of the start script.

For this test we are going to just start a standalone service. Basically you pass it the properties files for the service to use. It will then look for the correct connector to use.

This is what we are doing with the run-connect.sh script.

Before we can kick of the service we need to tell Kafka Connect where to find plugins. To do that we can simply edit the `/opt/kafka/config/connect-standalone.properties` file.

Just uncomment the plugin path, close to the bottom of the file and set it to the following.

plugin.path=/opt/kafka/connectors

Also in that file, disable the key value converter for schema enforcement.

key.converter.schemas.enable=false
value.converter.schemas.enable=false

This disables the schema enforcement, which is fine for this example as it keeps things simple, but you will may well want to have this enabled in production. You can do this on a per connector worker basis.

More about converters

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

The key.converter.schemas.enable and value.converter.schemas.enable are relevant for the JSON converter, but may not be relevant for other converters.

The converter used needs to match the type of data you want to transmit. This could well be different for each sink and source. The data is either inbound or outbound from Kafka. If it is inbound it is important that the converter matches the inbound format, if outbound it should match the format for the destination. The simplest converter (and often the right one to use) is org.apache.kafka.connect.converters.ByteArrayConverter. This just takes the raw data from one and moves those bytes to the other.

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

For this though we will stick with the JSON and String converters.

Now if you run ./run-connector.sh it will fire up the connect service and be ready to test. If you get errors, make sure you have the full path to the credentials specified.

Testing it out

Now we are going to do a quick manual test. Open up a new SSH connection to the Kafka VM. We’ll use that to quickly create a producer to post data into the kafka topic destined for pubsub, and then check with gcloud it has landed. Then we will swap the process, sending data from pubsub to Kafka.

With your second SSH session started, start a Kafka producer and add some messages to the topic.

kafka-console-producer.sh --broker-list localhost:9092 --topic to-pubsub

Then just type the messages at the cursor in json format.

> {"message": "hello"}
> {"message": "big world"}

Now we’ll just use gcloud to pull those messages.

Create a subscription.

gcloud pubsub subscriptions create from-kafka --topic=from-kafka

Then pull the messages that should be in there from Kafka.

gcloud pubsub subscriptions pull from-kafka --auto-ack --limit=10

You should see a table with the data, message_id and attributes with to items in it.

Now lets test the other way.

CTRL +c to exist the producer. Then let’s start a consumer to test data coming the other way.

kafka-console-consumer.sh --bootstrap-server=localhost:9092 --value-deserializer=org.apache.kafka.common.serialization.StringDeserializer --topic from-pubsub

Then using gcloud publish a message. Here we are using attributes to create a JSON data.

gcloud pubsub topics publish to-kafka  --attribute=data=hello

Should result in the below showing up in the consumer.

{"message":"","data":"hello"}

So there we have it, data is now travelling in both directions. As noted above, this is just an example, but should help you figure step through the basics to get going. Armed with this knowledge and experience, if you want to set this up on a Kafka cluster then check out this article, which also has shows you how to set up a GUI for Kafka Connect.

--

--