Develop Pulsar connectors for Pub/Sub
This blog post covers the key steps in developing Apache Pulsar connectors for Google Cloud Pub/Sub. It contains working code that supports the most basic data relay scenarios; it presents an automation framework for building and testing on GitHub; and it discusses some considerations for these steps. If you’re new to Pub/Sub and/or new to developing Pulsar connectors using Pulsar IO, this blog post is a great place to get started.
Pub/Sub sink connector
Working on the sink connector before the source connector gives you a chance to familiarize yourself with sending messages to Pub/Sub, which is the simpler of the two. You can take apart the official Pub/Sub Java quickstart samples and place the publisher client creation and shut down code and publish calls in the open()
, write()
, and close()
method in the Sink
interface implementation.
package org.apache.pulsar.io.gcp;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
@Slf4j
public class PubsubSink implements Sink<GenericObject> {
private Publisher publisher = null;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
PubsubSinkConfig pubsubSinkConfig = PubsubSinkConfig.load(config);
TopicName topicName =
TopicName.of(pubsubSinkConfig.getProjectId(), pubsubSinkConfig.getTopicId());
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(pubsubSinkConfig.getBatchSize())
.build();
this.publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
}
@Override
public void write(Record<GenericObject> record) {
if (record.getMessage().isPresent()) {
ByteString data = ByteString.copyFrom(record.getMessage().get().getData());
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
this.publisher.publish(pubsubMessage);
}
}
@Override
public void close() {
if (this.publisher != null) {
this.publisher.shutdown();
log.info("Pub/Sub sink has been shut down.");
}
}
}
This example enables batching in the Pub/Sub publisher. If you are already familiar with batching in Pulsar, this is nothing new. Being able to set a maximum number of messages per batch enables higher throughput. Here are some additional custom settings on the Pub/Sub publisher client and their equivalents in Pulsar:
- Pub/Sub publisher ordering keys vs. Pulsar message or partition key
- Pub/Sub schemas vs. Pulsar Schema
To add these custom settings to your sink connector: first, implement them in your sink config class as shown below; then, call them from sink connector class as shown above.
package org.apache.pulsar.io.gcp;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@Accessors(chain = true)
public class PubsubSinkConfig implements Serializable {
@FieldDoc(required = true, defaultValue = "", help = "Google Cloud project ID")
private String projectId = "";
@FieldDoc(required = true, defaultValue = "", help = "Google Cloud Pub/Sub topic ID")
private String topicId = "";
@FieldDoc(required = true, defaultValue = "", help = "Publisher batch size")
private Long batchSize = 10L;
public static PubsubSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map), PubsubSinkConfig.class);
}
}
Pub/Sub source connector
At a high level, the source connector code is the same as the sink connector code. We still create and shut down the Pub/Sub subscriber client in the open()
and close()
method in the Source
interface implementation. In addition, there is also a class PubsubRecord
that implements a Pulsar Record
and it’s used to convert a Pub/Sub message into a Pulsar record.
As an aside, a LinkedBlockingQueue
could be used to temporarily store messages received from the Pub/Sub subscription. If so, in the read()
method in the Pulsar PushSource
interface implementation, you could pop the oldest messages from the queue and forward it to a Pulsar topic. However, this can introduce message loss if your program fails after a message is acknowledged and before it is successfully written to Pulsar. (Credit: Kamal Aboul-Hosn)
A better implementation passes AckReplyConsumer
from Pub/Sub as an input argument to PubsubRecord
where we overwite the ack()
method and uses AckReplyConsumer
to ack the Pub/Sub message in it. This makes sure that the message acknowledgement on the Pub/Sub side happens after the message has been successfully written to Pulsar.
package org.apache.pulsar.io.gcp;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
@Slf4j
public class PubsubSource extends PushSource<byte[]> {
private Subscriber subscriber = null;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
PubsubSourceConfig pubsubSourceConfig = PubsubSourceConfig.load(config);
ProjectSubscriptionName projectSubscriptionName =
ProjectSubscriptionName.of(pubsubSourceConfig.getProjectId(), pubsubSourceConfig.getSubscriptionId());
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(pubsubSourceConfig.getFlowSize())
.build();
Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(projectSubscriptionName,
(PubsubMessage message, AckReplyConsumer consumer) -> {
Record<byte[]> record = new PubsubRecord(sourceContext.getOutputTopic(), message,
consumer);
consume(record);
});
subscriberBuilder.setFlowControlSettings(flowControlSettings);
subscriberBuilder.setParallelPullCount(pubsubSourceConfig.getNumStreams());
subscriberBuilder.setExecutorProvider(executorProvider);
subscriber = subscriberBuilder.build();
subscriber.startAsync().awaitRunning();
log.info("listening for messages on {}..", subscriber.getSubscriptionNameString());
}
@Override
public void close() {
if (this.subscriber != null) {
this.subscriber.stopAsync().awaitRunning();
}
}
private record PubsubRecord(String pulsarTopic, PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) implements Record<byte[]> {
@Override
public Optional<String> getKey() {
if (!this.pubsubMessage.getOrderingKey().isEmpty()) {
return Optional.of(this.pubsubMessage.getOrderingKey());
} else {
return Optional.empty();
}
}
@Override
public byte[] getValue() {
return this.pubsubMessage.getData().toByteArray();
}
@Override
public Optional<Long> getEventTime() {
return Optional.of(this.pubsubMessage.getPublishTime().getSeconds());
}
@Override
public Map<String, String> getProperties() {
return this.pubsubMessage.getAttributesMap();
}
@Override
public Optional<String> getDestinationTopic() {
return Optional.of(this.pulsarTopic);
}
@Override
public void ack(){ ackReplyConsumer.ack(); }
@Override
public void fail() { ackReplyConsumer.nack(); }
}
}
Besides performing simple message forwarding, this example also shows how to enable flow control and concurrency control in the Pub/Sub subscriber client. Flow control makes sure that no single subscriber gets overwhelmed or starved. Concurrency control allows multiple gRPC bidirectional streams to be opened for pulling messages, i.e, setParallelPullCount()
, and a multiple of that number of threads to be used for processing messages in the message callback function, i.e, setExecutorThreadCount()
.
The example allows user input for the former and hard codes the latter. Generally, you should let users decide what these values are based on how CPU-bound their message processing is and how well they can handle the incoming message volume.
Other subscriber features such as exactly once delivery and filtering can also be implemented.
Build
The connectors here use Apache Pulsar 2.11.x
and Java 17
. To package them as a NAR, you can add the following build
plugin to your pom.xml
:
<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>${nifi.nar.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
<executions>
<execution>
<id>default-nar</id>
<phase>package</phase>
<goals>
<goal>nar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
It is also necessary to create a META-INF/services/pulsar-io.yaml
file that specifies these connector parameters:
name: pubsub-connector
description: Pub/Sub Sink and Source Connectors
sinkClass: org.apache.pulsar.io.gcp.PubsubSink
sinkConfigClass: org.apache.pulsar.io.gcp.PubsubSinkConfig
sourceClass: org.apache.pulsar.io.gcp.PubsubSource
sourceConfigClass: org.apache.pulsar.io.gcp.PubsubSourceConfig
In my case, running mvn clean package -DskipTests=true
creates a target/tz-pulsar-io-1.0-SNAPSHOT.nar
of roughly 84 MB
.
Test
The README
file in my repo describes how to run the connectors with a local standalone Pulsar cluster. When I was developing them, that was how I tested them. However, that was not an efficient way to go about testing because it involves a number of setup steps. Leveraging Cloud Build to start a local standalone Pulsar cluster saves a lot of time here.
FROM maven:3.9-amazoncorretto-17-debian
RUN apt-get update && \
apt-get install -yq wget procps && \
wget https://archive.apache.org/dist/pulsar/pulsar-2.11.1/apache-pulsar-2.11.1-bin.tar.gz && \
tar xvfz apache-pulsar-2.11.1-bin.tar.gz && \
cd apache-pulsar-2.11.1 && \
pwd
This Dockerfile
pulls a Maven image with Java 17
and Maven preinstalled. It also installs wget
which is used to download Pulsar.
steps:
- id: 'pulsar-standalone'
name: gcr.io/cloud-builders/docker
args: [ 'build', '-t', '${_LOCATION}-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}', '.' ]
- id: 'run-tests'
name: ${_LOCATION}-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}
script: |
#!/usr/bin/env bash
export PULSAR_HOME=/apache-pulsar-2.11.1/
${PULSAR_HOME}/bin/pulsar-daemon start standalone
sleep 20
${PULSAR_HOME}/bin/pulsar-admin brokers healthcheck
mvn clean package -DskipTests=true
cp target/tz-pulsar-io-1.0-SNAPSHOT.nar ${PULSAR_HOME}/examples/
cp -r src/test/resources/ ${PULSAR_HOME}/examples/
${PULSAR_HOME}/bin/pulsar-admin sinks localrun --sink-config-file ${PULSAR_HOME}/examples/resources/tz-pubsub-sink.yaml --archive ${PULSAR_HOME}/examples/tz-pulsar-io-1.0-SNAPSHOT.nar &
mvn test -Dtest=org.apache.pulsar.io.gcp.PubsubSinkTest
${PULSAR_HOME}/bin/pulsar-admin sources localrun --source-config-file ${PULSAR_HOME}/examples/resources/tz-pubsub-source.yaml --archive ${PULSAR_HOME}/examples/tz-pulsar-io-1.0-SNAPSHOT.nar &
mvn test -Dtest=org.apache.pulsar.io.gcp.PubsubSourceTest
env:
- 'PROJECT_ID=${PROJECT_ID}'
- 'TOPIC_ID=${_TOPIC_ID}'
images:
- '${_LOCATION}-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}'
substitutions:
_LOCATION: us-central1
_REPOSITORY: pulsar
_IMAGE: pulsar-standalone
This cloudbuild.yaml
is broken down into two steps. Step #1 pulsar-standalone
builds a Docker image defined in Dockerfile
and uploads the image to Artifact Registry. From there, step #2 run-tests
starts a standalone Pulsar
cluster, package a NAR
for the connectors, and then runs the sink and source integration tests separately with the help of Pulsar CLI tools.
Admittedly it took me some effort to get this script working right. Here are the reasons for some odd looking choices in it:
sleep 20
is necessary because thehealthcheck
command does not returnok
right away. A failure here will cause the rest of the test to fail.- Copying the
nar
to${PULSAR_HOME}
from/workspace
, which is the default volume that Cloud Build provides, is also necessary. Pointing--archive
to a packagednar
in/workspace
returned errors likeSink/Source doesn't exist
. - Using the
&
operator to run the sink and source connector in the background while startingmvn clean test
makes sure a running Pulsar cluster is available for the integration tests.
I set up a Cloud Build trigger to run integration tests on each new commit to the main
branch. There are other triggers available too, such as pushes to a different branch, commits with certain tags, or pull requests.
This is a screenshot of my Cloud Build history. Build faf6afb7
was triggered by my latest commit to update README.md
.
All the working code in this blog post can be found on GitHub. Please note that although the connectors work as intended, they are not production-grade, and no release has been made. It is a simple implementation meant for learning purposes and does not have support for all the features of Pub/Sub. However, the CI/CD setup using Cloud Build is widely applicable and I highly recommend it in your development and release cycles. Hope you find this blog post helpful. Don’t hesitate to reach out for questions!