How to trace Pulsar messages with OpenTracing and Jaeger

Sijia-w
StreamNative
Published in
4 min readJun 11, 2020

The OpenTracing Pulsar Client is an integration of the Pulsar Client and OpenTracing APIs which are based on Pulsar Client Interceptors, a monitoring tool in the StreamNative Hub.

OpenTracing is an open distributed tracing standard for applications and OSS packages. Many tracing backend services support OpenTracing APIs, such as Jaeger, Zipkin and SkyWalking.

This blog guides you through every step of how to trace Pulsar messages by Jaeger through OpenTracing API.

Prerequisite

Before getting started, make sure you have installed JDK 8, Maven 3, and Pulsar (cluster or standalone). If you do not have an available Pulsar, follow the instructions to install one.

Step 1: start a Jaeger backend

1. Start a Jaeger backend in Docker.

docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

If you have successfully started Jaeger, you can open the Jaeger UI website successfully.

Tip

If you do not have a Jaeger Docker environment, you can download the binaries or build from source.

2. Visit http://localhost:16686 to open the Jaeger UI website without a username or password.

Step 2: add maven dependencies

This step uses OpenTracing Pulsar Client, which is integrated with the Pulsar Client and OpenTracing APIs based on Pulsar Client Interceptors, to trace Pulsar messages. Developed by StreamNative, the OpenTracing Pulsar Client acts as a monitoring tool in the StreamNatvie Hub.

Add Jaeger client dependency to connect to Jaeger backend.

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>opentracing-pulsar-client</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.2.0</version>
</dependency>

Step 3: use OpenTracing Pulsar Client

For easier understanding, this blog takes a usage scenario as an example. Suppose that you have three jobs and two topics. Job-1 publishes messages to the topic-A and Job-2 consumes messages from the topic-A. When Job-2 receives a message from topic-A, Job-2 sends a message to the topic-B, and then Job-3 consumes messages from topic-B. So there are two topics, two producers and two consumers in this scenario.

According to the scenario described previously, you need to start three applications to finish this job.

  • Job-1: publish messages to topic-A
  • Job-2: consume messages from topic-A and publish messages to topic-B
  • Job-3: consume messages from topic-B

Job-1

This example shows how to publish messages to topic-A in Java.

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);
Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producerA = client.newProducer(Schema.STRING)
.topic("topic-A")
.intercept(new TracingProducerInterceptor())
.create();
for (int i = 0; i < 10; i++) {
producerA.newMessage().value(String.format("[%d] Hello", i)).send();
}

Job-2

This example shows how to consume messages from topic-A and publish messages to topic-B in Java.

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);
Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("topic-A")
.subscriptionName("open-tracing")
.subscriptionType(SubscriptionType.Shared)
.intercept(new TracingConsumerInterceptor<>())
.subscribe();
Producer<String> producerB = client.newProducer(Schema.STRING)
.topic("topic-B")
.intercept(new TracingProducerInterceptor())
.create();
while (true) {
Message<String> received = consumer.receive();
SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer);
TypedMessageBuilder<String> messageBuilder = producerB.newMessage();
messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!");
// Inject parent span context
tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder));
messageBuilder.send();
consumer.acknowledge(received);
}

Job-3

This example shows how to consume messages from topic-B in Java.

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);
Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("topic-B")
.subscriptionName("open-tracing")
.subscriptionType(SubscriptionType.Shared)
.intercept(new TracingConsumerInterceptor<>())
.subscribe();
while (true) {
Message<String> received = consumer.receive();
System.out.println(received.getValue());
consumer.acknowledge(received);
}

Now, you can run Job-3, Job-2 and Job-1 one by one. You can see the Job-3 receives logs in the console as below:

[0] Hello Pulsar and OpenTracing!
[1] Hello Pulsar and OpenTracing!
...
[9] Hello Pulsar and OpenTracing!

Congratulations, your jobs work well. Now you can open the Jaeger UI again and there are ten traces in the Jaeger.

You can click a job name to view the details of a trace.

The span name is formatted as To__<topic-name> and From__<topic-name>__<subscription_name>, which makes it easy to tell whether it is a producer or a consumer.

Summary

As you can see, OpenTracing Pulsar Client integrates Pulsar client and OpenTracing to trace Pulsar messages easily. If you are using Pulsar and OpenTracing in your application, do not hesitate to try it out!

Additionally, I also wrote a tech blog for How to Use Apache SkyWalking to Trace Apache Pulsar Messages. For the complete content, see here.

About the author

Penghui Li is a PMC member of Apache Pulsar and a tech lead in Zhaopin.com, where he serves as the leading promoter to adopt Pulsar. His career has always involved messaging service from the messaging system, through the microservice, and into the current world with Pulsar. You can follow him on twitter.

This post was originally published by Penghui Li on the StreamNative blog.

Like this post? Please recommend and/or share.

Want to learn more? See https://streamnative.io/blog. Follow us here on Medium and check out our GitHub.

--

--