Writing your first Apache Pulsar Function in Kotlin

Pierre-Yves Lebecq
9 min readJul 15, 2020

--

In the previous article, we installed everything we need to write and run a Pulsar Function in Kotlin. If you haven't installed everything yet, now is the time to do it.

We will first write a very basic function. It won't do much but it will allow you to understand how to build it and how to deploy it. Then, in the following article, we will write a more useful function using more advanced features.

Let's start by creating a directory to hold all the code and configuration files we need.

$ mkdir -p my-pulsar-project/build && cd my-pulsar-project

Using Gradle, we can initialize a new project.

$ gradle initSelect type of project to generate:
1: basic
2: application
3: library
4: Gradle plugin
Enter selection (default: basic) [1..4] 1
Select build script DSL:
1: Groovy
2: Kotlin
Enter selection (default: Groovy) [1..2] 2
Project name (default: my-pulsar-project):> Task :init
Get more help with your project: https://guides.gradle.org/creating-new-gradle-builds
BUILD SUCCESSFUL in 16s
2 actionable tasks: 2 executed

Gradle will generate a build file, describing how the project must be built and packaged. For the first choice, the type of project to generate, enter "1" to use a "basic" project. This will determine the default content which will be generated in the Gradle build file. For the second choice, select option "2" to have the build file written in Kotlin. Gradle can work with build files written in Groovy and in Kotlin. Since we will write our Pulsar Function in Kotlin, we might as well write the build file in Kotlin.

This command will generate a few files for you:

  • settings.gradle.kts is a file containing the project name. It is also used to configure multi-project builds, but we won't need this feature.
  • build.gradle.kts is the file containing what dependencies are needed to build our project, how to build it and how to package it.
  • gradlew , gradlew.bat , and the gradle directory are used by the Gradle Wrapper. It allows developers who don't have Gradle installed on their computer to download the source files of the project and be able to build it without having to install Gradle. The wrapper will download the correct Gradle version for them. It ensures everyone is using the same version of Gradle without requiring every developer to manage their Gradle version manually. I find it very convenient. From now on, we will use the ./gradlew command to run Gradle commands instead of the global gradle command we used before.

Before writing any Kotlin code, we will write a small docker-compose.yml configuration file to be able to start Apache Pulsar as a Docker container.

Create the file at the root of the project directory, with the following content:

version: "3.7"
services:
pulsar:
image: apachepulsar/pulsar-all:2.6.0
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone"
volumes:
- "/pulsar/data"
- "/pulsar/conf"
- "./build:/app/build:delegated"
ports:
- "6650:6650"
- "8080:8080"

This configuration will ensure configuration and data for Pulsar are saved in volumes so they will persist between container restarts, and also shares the build directory of our project with the container so we will be able to run our function after we build it. It also makes sure to expose some ports used by Pulsar to our computer.

To run Apache Pulsar, you need to use the following command:

$ docker-compose up

This will start Apache Pulsar and you can see all outputs from the pulsar container. This commands never terminates so you will need to use a new terminal window from now on.

Now, we will properly configure the Gradle build by editing the gradle.build.kts file. Make sure it contains the following content:

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
kotlin("jvm") version "1.3.72"
id("com.github.johnrengelman.shadow") version "5.2.0"
}

group = "my.pulsar.project"
version = "1.0-SNAPSHOT"

repositories {
mavenCentral()
}

dependencies {
implementation(kotlin("stdlib-jdk8"))

implementation("org.apache.pulsar:pulsar-functions-api:2.6.+")
}

tasks {
compileKotlin {
kotlinOptions.jvmTarget = JavaVersion.VERSION_1_8.toString()
}
compileTestKotlin {
kotlinOptions.jvmTarget = JavaVersion.VERSION_1_8.toString()
}
}

tasks {
named<ShadowJar>("shadowJar") {
mergeServiceFiles()
}
}

tasks {
build {
dependsOn(shadowJar)
}
}

The plugins block declares the Gradle plugins used when building our project. Here, we are using the kotlin plugin to instruct Gradle how to compile Kotlin code, and we use the shadow plugin to package our code as a fat-jar. JAR files are Java archive files. They allow to aggregate Java code and associated metadata and resources. The shadow plugin will make sure to build a JAR file containing our project code plus its dependencies. If you want to know more about the different Java packaging technique, you can read this great article.

The repositories block declares the repositories to use to resolve dependencies. The two most well-known repositories are Maven Central and JCenter. At the time of writing, JCenter does not have version 2.6+ of the Apache Pulsar packages so I’m using Maven Central, but you should be able to use JCenter if you want to. You can take a look at the documentation to learn more about repositories.

The dependencies block declares the dependencies required by our project. Here, we need the Kotlin standard library and the Pulsar Functions Java SDK.

Finally, the tasks blocks configure some of the Gradle tasks. Here we make sure to configure the Kotlin compilation tasks to target Java 8 because that’s the version Pulsar is running, and we configure the shadowJar task properly and make sure it is run when running the ./gradlew build command.

Are you still reading? Good! Because we are going to start the fun part: writing our first function.

Let’s start IntelliJ IDEA. After some configuration, where you can leave everything to default if you want, you are prompted with the welcome window.

IntelliJ IDEA welcome window

Click the “Open or Import” option, and select the directory where you created the project. IntelliJ IDEA will configure itself based on the build.gradle.kts file. That way, you get a synchronization of your IDE Build settings and the command line for free. Every time you make changes to the build.gradle.kts file, you will be able to re-sync your IDE settings. This way, you will be able to use the IDE during development, and everything should work fine also on a CI server using Gradle with the command line.

Now we’re going to write a Pulsar Function. We will start with the traditional “Hello World!”. Our function will consume a topic with messages containing a name and will publish messages containing “Hello, <name>!” in another topic.

Create a file at path src/main/kotlin/HelloFunction.kt with the following content:

Code of our Hello function

We created a class, implementing the Function interface defined by the Pulsar Function Java SDK. This interface is generic and we specified the input type of the function is String , and the output type is also String . We will see in a future article what other — and better — choices we have for input and output types.

To implement the interface we have to define a process function, receiving the input and a context as parameters, and returning a String . This is all we need to implement a basic function. You can ignore the context parameter for now. It provides useful features that we will use in the next article. But before using these advanced features, let’s see how to build and run our function.

Building the function is easy thanks to Gradle and the build configuration we set up before. Run the following command to start the build:

$ ./gradlew buildBUILD SUCCESSFUL in 2s
4 actionable tasks: 4 executed

Gradle built the JAR archive in the build/libs directory. You should be able to find two JAR files in this directory:

  • my-pulsar-project-1.0-SNAPSHOT.jar : This is a JAR containing only your HelloFunctionclass.
  • my-pulsar-project-1.0-SNAPSHOT-all.jar : This is a JAR containing your function class and its dependencies. This is the JAR file we will deploy to Pulsar.

The Docker configuration we set up in the previous article shares the build directory with the Pulsar container so we don’t have to manually copy the file to the container or download it from a remote location. To deploy the function, we will use docker-compose to run the pulsar-admin command in the container:

$ docker-compose exec pulsar bin/pulsar-admin functions create --jar /app/build/libs/my-pulsar-project-1.0-SNAPSHOT-all.jar --classname HelloFunction --inputs names --output hello
"Created Successfully"

Let’s break this command down:

docker-compose exec pulsar : instructs docker-compose to execute a command in the pulsar container.

bin/pulsar-admin functions create : uses the pulsar-admin CLI to create a function. We also used several options with this command:

  • The --jar is used to tell Pulsar the location of our JAR file.
  • The --classname option is used to tell Pulsar what class to use as the Pulsar Function. The class name must be fully qualified. Because we did not specify a package instruction in our code, the HelloFunction name is indeed fully qualified. But if you added a package instruction, make sure to include the full name of the class (e.g. --classname my.pulsar.project.HelloFunction ).
  • The --inputs option defines the input topic our function will consume. Functions can even consume multiple topics! You can specify multiple names by separating with a comma (e.g. --inputs staff,users ) and even use the * character to match topics to subscribe to (e.g. --inputs names-* ). This can be very powerful because the pattern will also match topics created after your function is deployed. And all of this is handled by Pulsar for you.
  • Finally, the --output option specifies the topic that will receive our function output messages. This is the easiest way to make a function to publish messages. You can also publish messages using the context parameter, which allows you to determine the destination topic using code for example. We will do that in the following article.

And there it is, your function is now up and running. To observe if it’s running properly, we will use two different terminal windows:

  • In the first window, we are going to consume the hello topic. This is the output topic of our function, so if messages are coming in this topic, it means the function is running properly.
  • In the second window, we will publish names in the names topic. This is the input topic of our function, so the function will run for every message published on this topic.

To consume messages, run the following command:

$ docker-compose exec pulsar bin/pulsar-client consume -s pulsar-client -n 0 hello

The pulsar-client command can be used to produce and consume messages. When using the consume sub-command, we need to specify the topic to consume as a parameter, and at least the -s option which specify the subscription name to use. Pulsar uses the concept of “subscriptions” to keep track of message delivered to consumers. If you’re not familiar with this concept, you should read the documentation before going further. By default, the consume sub-command consumes only one message and exits. The -n 0 option instructs it to consume messages forever.

Now, use the second terminal window to publish messages to the names topic:

$ docker-compose exec pulsar bin/pulsar-client produce -m "Pierre-Yves" names

Now if you go back to the first window consuming the hello topic you should see something like this:

----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CJkDEAMgAA==, __pfn_input_topic__=persistent://public/default/names], content:Hello, Pierre-Yves

As you can see, the function properly produces messages.

Be careful when using pulsar-client to produce messages containing commas. The comma is used by default as a separator to send multiple messages in one command. It allows you to do this for example:

$ docker-compose exec pulsar bin/pulsar-client produce -m "Alice,Bob" names

And on the consumer side, you will get two messages:

----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CJkDEAQgAA==, __pfn_input_topic__=persistent://public/default/names], content:Hello, Alice
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CJkDEAUgAA==, __pfn_input_topic__=persistent://public/default/names], content:Hello, Bob

If you ever need to send messages containing commas, use the -s option to choose a different separator. For example, this will result in only one message being published:

$ docker-compose exec pulsar bin/pulsar-client produce -s "%" -m "Aegon of Houses Targaryen and Stark, Sixth of His Name, the Resurrected, King of the Andals and the First Men, 998th Commander of the Night's Watch, the White Wolf, Lord of the Seven Kingdoms, and Protector of the Realm" names

If you’re still reading, congratulations! That was quite some work to do but hopefully, now you know everything you need to write and run functions in Pulsar.

Don’t be afraid to try things in your function. If you make changes and want to update your function, build it as before using the ./gradlew build command, and then deploy it with the following command:

$ docker-compose exec pulsar bin/pulsar-admin functions update --jar /app/build/libs/my-pulsar-project-1.0-SNAPSHOT-all.jar --classname HelloFunction --inputs names --output hello
"Created Successfully"

In the following article, we will write a more useful function making use of features provided by the context parameter.

--

--

Pierre-Yves Lebecq
Pierre-Yves Lebecq

No responses yet