Writing your first Apache Pulsar Function in Kotlin
In the previous article, we installed everything we need to write and run a Pulsar Function in Kotlin. If you haven't installed everything yet, now is the time to do it.
We will first write a very basic function. It won't do much but it will allow you to understand how to build it and how to deploy it. Then, in the following article, we will write a more useful function using more advanced features.
Let's start by creating a directory to hold all the code and configuration files we need.
$ mkdir -p my-pulsar-project/build && cd my-pulsar-project
Using Gradle, we can initialize a new project.
$ gradle initSelect type of project to generate:
1: basic
2: application
3: library
4: Gradle plugin
Enter selection (default: basic) [1..4] 1Select build script DSL:
1: Groovy
2: Kotlin
Enter selection (default: Groovy) [1..2] 2Project name (default: my-pulsar-project):> Task :init
Get more help with your project: https://guides.gradle.org/creating-new-gradle-buildsBUILD SUCCESSFUL in 16s
2 actionable tasks: 2 executed
Gradle will generate a build file, describing how the project must be built and packaged. For the first choice, the type of project to generate, enter "1" to use a "basic" project. This will determine the default content which will be generated in the Gradle build file. For the second choice, select option "2" to have the build file written in Kotlin. Gradle can work with build files written in Groovy and in Kotlin. Since we will write our Pulsar Function in Kotlin, we might as well write the build file in Kotlin.
This command will generate a few files for you:
settings.gradle.kts
is a file containing the project name. It is also used to configure multi-project builds, but we won't need this feature.build.gradle.kts
is the file containing what dependencies are needed to build our project, how to build it and how to package it.gradlew
,gradlew.bat
, and thegradle
directory are used by the Gradle Wrapper. It allows developers who don't have Gradle installed on their computer to download the source files of the project and be able to build it without having to install Gradle. The wrapper will download the correct Gradle version for them. It ensures everyone is using the same version of Gradle without requiring every developer to manage their Gradle version manually. I find it very convenient. From now on, we will use the./gradlew
command to run Gradle commands instead of the globalgradle
command we used before.
Before writing any Kotlin code, we will write a small docker-compose.yml
configuration file to be able to start Apache Pulsar as a Docker container.
Create the file at the root of the project directory, with the following content:
version: "3.7"
services:
pulsar:
image: apachepulsar/pulsar-all:2.6.0
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone"
volumes:
- "/pulsar/data"
- "/pulsar/conf"
- "./build:/app/build:delegated"
ports:
- "6650:6650"
- "8080:8080"
This configuration will ensure configuration and data for Pulsar are saved in volumes so they will persist between container restarts, and also shares the build
directory of our project with the container so we will be able to run our function after we build it. It also makes sure to expose some ports used by Pulsar to our computer.
To run Apache Pulsar, you need to use the following command:
$ docker-compose up
This will start Apache Pulsar and you can see all outputs from the pulsar container. This commands never terminates so you will need to use a new terminal window from now on.
Now, we will properly configure the Gradle build by editing the gradle.build.kts
file. Make sure it contains the following content:
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
plugins {
kotlin("jvm") version "1.3.72"
id("com.github.johnrengelman.shadow") version "5.2.0"
}
group = "my.pulsar.project"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("org.apache.pulsar:pulsar-functions-api:2.6.+")
}
tasks {
compileKotlin {
kotlinOptions.jvmTarget = JavaVersion.VERSION_1_8.toString()
}
compileTestKotlin {
kotlinOptions.jvmTarget = JavaVersion.VERSION_1_8.toString()
}
}
tasks {
named<ShadowJar>("shadowJar") {
mergeServiceFiles()
}
}
tasks {
build {
dependsOn(shadowJar)
}
}
The plugins
block declares the Gradle plugins used when building our project. Here, we are using the kotlin
plugin to instruct Gradle how to compile Kotlin code, and we use the shadow
plugin to package our code as a fat-jar. JAR files are Java archive files. They allow to aggregate Java code and associated metadata and resources. The shadow plugin will make sure to build a JAR file containing our project code plus its dependencies. If you want to know more about the different Java packaging technique, you can read this great article.
The repositories
block declares the repositories to use to resolve dependencies. The two most well-known repositories are Maven Central and JCenter. At the time of writing, JCenter does not have version 2.6+ of the Apache Pulsar packages so I’m using Maven Central, but you should be able to use JCenter if you want to. You can take a look at the documentation to learn more about repositories.
The dependencies
block declares the dependencies required by our project. Here, we need the Kotlin standard library and the Pulsar Functions Java SDK.
Finally, the tasks
blocks configure some of the Gradle tasks. Here we make sure to configure the Kotlin compilation tasks to target Java 8 because that’s the version Pulsar is running, and we configure the shadowJar
task properly and make sure it is run when running the ./gradlew build
command.
Are you still reading? Good! Because we are going to start the fun part: writing our first function.
Let’s start IntelliJ IDEA. After some configuration, where you can leave everything to default if you want, you are prompted with the welcome window.
Click the “Open or Import” option, and select the directory where you created the project. IntelliJ IDEA will configure itself based on the build.gradle.kts
file. That way, you get a synchronization of your IDE Build settings and the command line for free. Every time you make changes to the build.gradle.kts
file, you will be able to re-sync your IDE settings. This way, you will be able to use the IDE during development, and everything should work fine also on a CI server using Gradle with the command line.
Now we’re going to write a Pulsar Function. We will start with the traditional “Hello World!”. Our function will consume a topic with messages containing a name and will publish messages containing “Hello, <name>!” in another topic.
Create a file at path src/main/kotlin/HelloFunction.kt
with the following content:
We created a class, implementing the Function
interface defined by the Pulsar Function Java SDK. This interface is generic and we specified the input type of the function is String
, and the output type is also String
. We will see in a future article what other — and better — choices we have for input and output types.
To implement the interface we have to define a process
function, receiving the input and a context as parameters, and returning a String
. This is all we need to implement a basic function. You can ignore the context
parameter for now. It provides useful features that we will use in the next article. But before using these advanced features, let’s see how to build and run our function.
Building the function is easy thanks to Gradle and the build configuration we set up before. Run the following command to start the build:
$ ./gradlew buildBUILD SUCCESSFUL in 2s
4 actionable tasks: 4 executed
Gradle built the JAR archive in the build/libs
directory. You should be able to find two JAR files in this directory:
my-pulsar-project-1.0-SNAPSHOT.jar
: This is a JAR containing only yourHelloFunction
class.my-pulsar-project-1.0-SNAPSHOT-all.jar
: This is a JAR containing your function class and its dependencies. This is the JAR file we will deploy to Pulsar.
The Docker configuration we set up in the previous article shares the build
directory with the Pulsar container so we don’t have to manually copy the file to the container or download it from a remote location. To deploy the function, we will use docker-compose
to run the pulsar-admin
command in the container:
$ docker-compose exec pulsar bin/pulsar-admin functions create --jar /app/build/libs/my-pulsar-project-1.0-SNAPSHOT-all.jar --classname HelloFunction --inputs names --output hello
"Created Successfully"
Let’s break this command down:
docker-compose exec pulsar
: instructs docker-compose
to execute a command in the pulsar
container.
bin/pulsar-admin functions create
: uses the pulsar-admin
CLI to create a function. We also used several options with this command:
- The
--jar
is used to tell Pulsar the location of our JAR file. - The
--classname
option is used to tell Pulsar what class to use as the Pulsar Function. The class name must be fully qualified. Because we did not specify apackage
instruction in our code, theHelloFunction
name is indeed fully qualified. But if you added apackage
instruction, make sure to include the full name of the class (e.g.--classname my.pulsar.project.HelloFunction
). - The
--inputs
option defines the input topic our function will consume. Functions can even consume multiple topics! You can specify multiple names by separating with a comma (e.g.--inputs staff,users
) and even use the*
character to match topics to subscribe to (e.g.--inputs names-*
). This can be very powerful because the pattern will also match topics created after your function is deployed. And all of this is handled by Pulsar for you. - Finally, the
--output
option specifies the topic that will receive our function output messages. This is the easiest way to make a function to publish messages. You can also publish messages using thecontext
parameter, which allows you to determine the destination topic using code for example. We will do that in the following article.
And there it is, your function is now up and running. To observe if it’s running properly, we will use two different terminal windows:
- In the first window, we are going to consume the
hello
topic. This is the output topic of our function, so if messages are coming in this topic, it means the function is running properly. - In the second window, we will publish names in the
names
topic. This is the input topic of our function, so the function will run for every message published on this topic.
To consume messages, run the following command:
$ docker-compose exec pulsar bin/pulsar-client consume -s pulsar-client -n 0 hello
The pulsar-client
command can be used to produce and consume messages. When using the consume
sub-command, we need to specify the topic to consume as a parameter, and at least the -s
option which specify the subscription name to use. Pulsar uses the concept of “subscriptions” to keep track of message delivered to consumers. If you’re not familiar with this concept, you should read the documentation before going further. By default, the consume
sub-command consumes only one message and exits. The -n 0
option instructs it to consume messages forever.
Now, use the second terminal window to publish messages to the names
topic:
$ docker-compose exec pulsar bin/pulsar-client produce -m "Pierre-Yves" names
Now if you go back to the first window consuming the hello
topic you should see something like this:
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CJkDEAMgAA==, __pfn_input_topic__=persistent://public/default/names], content:Hello, Pierre-Yves
As you can see, the function properly produces messages.
Be careful when using pulsar-client
to produce messages containing commas. The comma is used by default as a separator to send multiple messages in one command. It allows you to do this for example:
$ docker-compose exec pulsar bin/pulsar-client produce -m "Alice,Bob" names
And on the consumer side, you will get two messages:
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CJkDEAQgAA==, __pfn_input_topic__=persistent://public/default/names], content:Hello, Alice----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CJkDEAUgAA==, __pfn_input_topic__=persistent://public/default/names], content:Hello, Bob
If you ever need to send messages containing commas, use the -s
option to choose a different separator. For example, this will result in only one message being published:
$ docker-compose exec pulsar bin/pulsar-client produce -s "%" -m "Aegon of Houses Targaryen and Stark, Sixth of His Name, the Resurrected, King of the Andals and the First Men, 998th Commander of the Night's Watch, the White Wolf, Lord of the Seven Kingdoms, and Protector of the Realm" names
If you’re still reading, congratulations! That was quite some work to do but hopefully, now you know everything you need to write and run functions in Pulsar.
Don’t be afraid to try things in your function. If you make changes and want to update your function, build it as before using the ./gradlew build
command, and then deploy it with the following command:
$ docker-compose exec pulsar bin/pulsar-admin functions update --jar /app/build/libs/my-pulsar-project-1.0-SNAPSHOT-all.jar --classname HelloFunction --inputs names --output hello
"Created Successfully"
In the following article, we will write a more useful function making use of features provided by the context
parameter.