Photo by Annie Spratt on Unsplash

Kafka record tracing

A journey of discovery for a new engineer learning about Kafka

Cosmin Mircea
Published in
7 min readOct 8, 2020

--

The problem

Internal users wanted to know how long would it take for a message to flow through our data pipeline and have the results available in an output topic. While it might look like a simple question, it hides immense complexity in a distributed system. Retrieving this type of information for each event passing through the pipeline is a difficult task without adequate monitoring.

Our answer at that moment: “It depends…”

It was the start of further conversations and the poof of concept that follows.

The primary aim was to have visibility over the time it would take a message to pass through our pipeline.

Our secondary aim was to be able to investigate bugs and incidents in a distributed system. Observing these services becomes very difficult when it is happening at scale. Millions of events can pass through dozens or hundreds of microservices, and only relying on application logs you realize that they are not fit for purpose, due to the nature of streaming systems and the high amount traffic passing through them. While this problem can be addressed by better logging tools, tracing the error through the system would still be a strenuous endeavour.

After those conversations and identifying our needs, we settled into attempting to implement a distributed tracing solution, relying on its ability to give us an overview of the whole system as well as a focused view of a particular event. This, in turn, will enable us to identify and investigate the services which potentially are responsible for the error, either via a top-down approach starting with an entire trace and then drilling down, or a bottom-up approach starting near the potential cause of the incident and expanding out from there.

Distributed tracing system

What is it, how it works and why is it needed

With microservices, the tasks of monitoring the whole application become an impossible feat, as any external monitoring service will give you an overall view about what is happening in your application, lacking the fine details of what happens with the data whilst inside the app.

A distributed tracing system allows us to monitor each request as it hits different services of our application, deployed on various systems, platforms, environments, and identify where failures or performance issues are occurring.

The idea behind how this system works is rather simple:

  • assign a trace id to each request
  • include that id in all log messages
  • pass that id through each service
  • record the start and end time for each span id as it flows through.

These steps will create for each traced request a so-called span with metadata referencing the service, endpoints and timestamps (see here https://zipkin.io/pages/data_model.html).

In our case, by aggregating and correlating these spans, we can identify bottlenecks as well as understand the overall time it takes a message to pass through our pipeline. This is done by simply searching for the id in a monitoring system like Kibana or through the Zipkin UI.

Simply put, a distributed tracing system collects and saves data from each service monitored, correlates it based on a trace id, and makes it available for visualisation via an API.

Sample Zipkin data model

[
{
"traceId": "5982fe77008310cc80f1da5e10147517",
"name": "get",
"id": "bd7a977555f6b982",
"timestamp": 1458702548467000,
"duration": 386000,
"localEndpoint": {
"serviceName": "zipkin-query",
"ipv4": "192.168.1.2",
"port": 9411
},
"annotations": [
{
"timestamp": 1458702548467000,
"value": "sr"
},
{
"timestamp": 1458702548853000,
"value": "ss"
}
]
},{
"traceId": "5982fe77008310cc80f1da5e10147517",
"name": "get-traces",
"id": "ebf33e1a81dc6f71",
"parentId": "bd7a977555f6b982",
"timestamp": 1458702548478000,
"duration": 354374,
"localEndpoint": {
"serviceName": "zipkin-query",
"ipv4": "192.168.1.2",
"port": 9411
},
"tags": {
"lc": "JDBCSpanStore",
"request": "QueryRequest{serviceName=zipkin-query, spanName=null, annotations=[], binaryAnnotations={}, minDuration=null, maxDuration=null, endTs=1458702548478, lookback=86400000, limit=1}"
}
},..]

To remember:

  • one trace id can be attached to multiple span ids
  • one span id keeps track of metadata of a traced service + a parent id of the service sender
  • a trace is a correlation of all spans for one trace id

Zipkin

Developed by Twitter, written in Java, and open-sourced in 2012, this seems to be the overall preferred tool to gain transparency into your microservice architecture. As a result, it is the tool I have used to create a proof of concept.

Need to know components :

  • Reporter: creates the spans we talked above and sends them to a collector via HTTP (or Kafka and Scribe)
  • Collector: validates and stores the spans
  • Storage: Used to store the spans
  • API: Used to access the saved data
  • WebUI: Zipkin comes with its own web UI for viewing the traces

Implementation

A demo is available at the current link.

https://github.com/data-rocks-team/kafka-distributed-tracing

To run this application, clone the repo, bring the services up via docker-compose up and start the Producer, Consumer and Streaming applications.

In this specific scenario I chose to use ElasticSearch to back my data (as I had a previous implementation of this service and docker file setup and some experience with it, however Cassandra can be used as well) and to bring up the services via docker-compose:

  • The services needed to run this spike are listed in the docker-compose: Zipkin, Kafka, Zookeeper and ElasticSearch.
  • There are a few simple configuration details to look for in the docker-compose file you can find here: https://github.com/data-rocks-team/kafka-distributed-tracing/blob/master/docker-compose.yml — such as pointing Zipkin to the storage we want to save and read data from.
  • Auto-topic creation is enabled, therefore it is important that the Producer, Consumer and the Streaming service are run in this order, to enable the topic creation and trace.

For this example, I have used the Brave instrumentation library, written in Java (https://github.com/openzipkin/brave).

The applications required to demonstrate the tracing are the producer, consumer and a streaming service.

Producer

Steps:

  1. Configure the reporter and the tracing library:
//CONFIGURE TRACINGfinal URLConnectionSender sender = URLConnectionSender.newBuilder().endpoint("<http://127.0.0.1:9411/api/v2/spans>").build();final AsyncReporter reporter=AsyncReporter.builder(sender).build();final Tracing tracing = Tracing.newBuilder().localServiceName("simpleProducer_test").sampler(Sampler.ALWAYS_SAMPLE).spanReporter(reporter).build();final KafkaTracing kafkaTracing = KafkaTracing.newBuilder(tracing).remoteServiceName("kafka").build();
final Tracer tracer = Tracing.currentTracer();
//END CONFIGURATION

2. Wrap the Kafka producer in kafkaTracing.producer:

final Producer tracedKafkaProducer = kafkaTracing.producer(producer);

3. Create spans:

  • measurements are taken between the span annotations
  • use the reporter flush to force messages to be sent to Zipkin
//Create recordProducerRecord<String, String> record = new ProducerRecord<>("test_tracing", null, "Test");//Create span 
ScopedSpan span = tracer.startScopedSpan("produce-to-kafka");
span.tag("name", "sending-kafka-record");span.annotate("starting operation");span.annotate("sending message to kafka"); tracedKafkaProducer.send(record);span.annotate("complete operation");span.finish();reporter.flush(); // flush method which sends messages to zipkin logger.info("End of application");

Consumer

Steps:

  1. Same configuration as in the producer: the only change takes place to the localServiceName
//CONFIGURE TRACING 
final URLConnectionSender sender = URLConnectionSender.newBuilder().endpoint("http://127.0.0.1:9411/api/v2/spans").build();
final AsyncReporter reporter = AsyncReporter.builder(sender).build(); final Tracing tracing = Tracing.newBuilder().localServiceName("simpleConsumer_test").sampler(Sampler.ALWAYS_SAMPLE).spanReporter(reporter).build(); final KafkaTracing kafkaTracing = KafkaTracing.newBuilder(tracing).remoteServiceName("kafka").build(); final Tracer tracer = Tracing.currentTracer();
//END CONFIGURATION

2. Wrap the consumer into kafkaTracing

Consumer<String, String> tracingConsumer = kafkaTracing.consumer(consumer);

3. Subscribe tracing consumer to topic tracingConsumer.subscribe(Collections.singleton("test_tracing"));

4. Read data and send spans to Zipkin: nextSpan starts sending and span.finish ends it

while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record: records){
Span span = kafkaTracing.nextSpan(record).name("kafka-to-consumer").start();
span.annotate("Start consuming"); logger.info("key: " + record.key() + "value: " + record.value()); span.annotate("Consume finished");
span.finish();
}
}

Streaming:

Steps:

  1. Add the configuration:
//CONFIGURE TRACING
final URLConnectionSender sender = URLConnectionSender.newBuilder().endpoint("http://127.0.0.1:9411/api/v2/spans").build();
final AsyncReporter reporter = AsyncReporter.builder(sender).build();final Tracing tracing = Tracing.newBuilder().localServiceName("Kafka_Streaming").sampler(Sampler.ALWAYS_SAMPLE).spanReporter(reporter).build();final KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.create(tracing);
//END CONFIGURATION

2. Wrap kafkaStream into kafkaStreamTracing

KafkaStreams streams = kafkaStreamsTracing.kafkaStreams(builder.build(), config);

And there you go.

To see it in action follow these steps:

  1. Bring up all the services: docker-compose up
  2. Start the services: Producer, Consumer and Streaming
  3. Navigate to localhost:9411
  4. Start producing to the topic

Result:

The first window we see is the one with an overview of all the traces we have recored, included in the root, trace id, start time and how long it took for our message to traverse the pipeline and be consumed:

Overview of traces

In the following window, we have a detailed imaged for a specific trace, identifying all the spans (producer, streaming and consumer, with annotations recorded for each service:

Detailed view of a specific trace

For the code of this example please visit this Github page: https://github.com/data-rocks-team/kafka-distributed-tracing

Conclusion

Distributed tracing is a useful concept for monitoring flows of data in your distributed system by:

  • understanding the behaviour of your distributed system, through transparency and visualisation
  • tracing messages across multiple applications and systems
  • locating performance issues and errors
  • surfacing latency issues between services
  • identifying services that require optimisation
  • having an overview of your service dependencies

Zipkin:

Is an excellent tool for surfacing these types of information — related to bottlenecks, errors, latency problems — as they happen:

  • useful for monitoring a message passing through the pipeline, the services it is hitting and how long it takes on average to be consumed
  • out of the box UI with relevant information for developers to access and locate issues
  • easy to implement with instrumentation libraries written in multiple languages

Sources

https://zipkin.io/

https://lightstep.com/distributed-tracing/

https://petabridge.com/blog/why-use-distributed-tracing/

https://newrelic.com/resources/ebooks/quick-introduction-distributed-tracing (they have their own distributed tracing system)

https://epsagon.com/observability/zipkin-or-jaeger-the-best-open-source-tools-for-distributed-tracing/

--

--