Tracking usage metrics with Kafka integration

Aswini
Unibuddy
Published in
10 min readNov 29, 2021
Tracking Usage Metrics

Our mission at Unibuddy is to empower students to feel confident in making life’s tough decisions when choosing where to study. Our leading tech products enable student ambassadors to connect with prospective students and allow our customers (the universities using our products) to measure and track the amount of time their students spend on the platform, so they may recompense their volunteer students in a structured manner based on various metrics.

Unibuddy has various features which users engage with, the ones we felt were more indicative of active usage were those on Unibuddy Chat and Unibuddy Events, where there are constant conversations with prospects, blog platform usage and participation in live events.

In this blog post, we discuss one of the challenges we faced in implementing this feature, and how we obtained information from the other microservices that are part of the Unibuddy application stack.

How do individual services communicate with each other?

Credits: Two services talking to each other

Communication can be orchestrated in many ways:

  1. HTTP (REST or GraphQL)
  2. Messaging (RabbitMQ, SNS-SQS, JMS, Kafka)
  3. AWS S3 and Lambda
  4. Business Process Management workflows

As Unibuddy’s product line grows and we diverge from the monolith, we will need more teams to manage these areas and as a result, the number of micro-services increases. We need an architecture where the services are decoupled, easily scalable and frequently deployable.

Event-driven architecture (EDA) is a software design pattern where loosely coupled services communicate with each other via asynchronous messaging. We opted for an asynchronous messaging system that is able to provide high throughput and is highly configurable based on the needs of individual services.

Why we chose Kafka

Kafka is based on a traditional messaging model, where the producer pushes messages to the broker and a consumer pulls messages from the broker. To understand the basics of Kafka, have a read of this Kafka introduction.

Kafka can be used with most of the popular clients (full list found here) There are a couple of client libraries in python such as kafka-python, pykafka or confluent-kafka, but from a performance point of view, confluent-kafka stands out. It’s also highly reliable since it’s actively developed and maintained by Confluent. All the discussion points we cover here will be based on the library confluent-kafka.

Use case:

Let’s walk through the use cases relevant to Kafka integration between the monolith and the service responsible for recording the time spent on areas such as live events, blogs and conversations at Unibuddy.

Design Flow

All the events mentioned above happen at different points of time, in different services. We opted for an event-based messaging system, that allows us to track time spent by each user.

All these events, depending on where they happened, need specific processing logic. Consequently, we decided to create a new microservice to manage these events.

We had hours of discussions on the following aspects of integration:

  1. What is the event structure going to be?
  2. How to parse date-time information in the event?
  3. How to reliably send messages to Kafka?
  4. How to reliably consume a message?
  5. What can be done to achieve data idempotency?
  6. How to maintain event versioning?

Let’s discuss each point in detail.

1. Event Semantics

The major discussion points around event semantics are:

  • Should the event structure follow an event carried state transfer or event notification pattern? (Read about the patterns here)
  • What are the attributes of an event?

When we began this journey, our event design was based on what the downstream service needed. However, each event has the potential to be a domain event. This means we had foreseen a lot more services consuming the events than just our downstream service. Therefore, a lot of modifications around the event schema might happen based on the needs of services. It would be difficult for the services to keep track of all the changes in the event schema.

We opted to use the ‘event notification pattern’ so that only the key identifiers of a particular state will be included as part of an event. For all the events in our use cases, we only send the key identifiers of the event, which in turn means fewer modifications around the event schema

Event carried state transfer (left) vs Event Notification pattern(right)

In order to do the actual message processing in the downstream service, we have to make an API call back to the monolith. In the world of message processing, there is a probability that the object might be modified between the time the message is produced and the time it is processed by the consumer. Though it comes with the cost of additional data fetching from microservice, event notification pattern avoids such discrepancies.

2. Parsing Datetime

Every event discussed has date-time information either in a created or updated attribute. We needed to standardise this as we can’t send date-time objects in the event. Supported data types in Kafka are double, int, byte[], string, long and ByteBuffer. So we parsed all date-time objects into UTC iso format strings.

We arrived at an event structure similar to this:

{
event_entity_id: 123456,
user_id: 67890,
created: datetime.utcnow().isoformat() — -> for create events
updated: datetime.utcnow().isoformat() — -> for update events
}

3. Handling Message Acknowledgement on Producer

Messages written to the partition are not immediately read by consumers. It depends on how the acks (configuration setting in Confluent for acknowledgement) are configured on the producer (you can apply ‘n’ number of configurations on the producer, consumer and admin based on your use case). There are three configurations based on acks on the producer, these are:

  1. Default acks:1 — they are applied If we don’t specify any specific configuration, meaning, the leader of the partition should explicitly acknowledge that the message is successfully written. It’s a minimum guarantee that you receive from Kafka, but still not the highest durability that you can achieve with this setting.
acks: 1

2. acks:0 setting — this is applied if your application requires high throughput. It doesn’t return any acknowledgement so you cannot determine the offset of the message. This would be a problem when you have to replay the events in case of an offset reset which might happen when the consumer group is modified. This configuration doesn’t provide any durability.

acks: 0

3. The acks:-1 setting — This is the strongest guarantee that Kafka provides. The leader of the partition acknowledges that the leader accept message writes and it is replicated to all the in-sync replicas. It provides less throughput but high durability.

acks: -1

We went for acks:-1 as we required durability over throughput.

Delivery Report

Do you want to do anything with the acknowledgement that you receive? You might want to log errors if the message writes failed or you might want to invoke some other action if the message write is successful. So where do you get this delivery report?

If you had noticed the line in producer integration here:

p.poll(0)

It serves delivery report callbacks of the previous message sent. Let’s say, msg1 (here msg is an abbreviation of message) is sent, no delivery report is served. Next, msg2 is sent, delivery report of msg1 is served. For the next message, the delivery report of msg2 is served and so on.

You can configure the delivery report implementation:

p.produce(topic=topic, value=value, key=key, callback=delivery_report)

In the code block described above, delivery_report is the callback function that gets called if you receive the acknowledgement. The implementation of delivery_report can be modified based on what you want to do with it.

Delivery report in Producer

When you see the integration, you might have noticed the last line of producer integration as:

p.flush()

flush() waits for any outstanding messages to be delivered and delivery report callbacks to be triggered.

Let’s see what happens on the produce() call.

produce() is asynchronous and it enqueues the message in an internal queue before sending it to the Kafka broker. If the leader of the partition is not available, there will be an additional wait time.

How long do the messages stay in the internal queue? It depends on the configuration:

queue.buffering.max.ms = ‘x’ms

It’s 1000 ms by default. It can also be configured based on the number of messages.

It’s generally advised to avoid flush().

DO NOT use flush()

The reason being, that calling flush() after every “produce” call, would wait for any outstanding messages which is the additional latency.

queue.buffering.max.ms+latency

It’s indeed a synchronous producer. You can further read about this at https://github.com/confluentinc/confluent-kafka-python/issues/137

4. Consumer Commit:

Asynchronous commit

Consumer commit is asynchronous by default. It will periodically commit the current offsets for the partitions. Periodic commits don’t factor in the complexity of message processing.

Let’s say we need to filter out message processing based on the user’s role. The user’s account role is not part of the event, we need to send an API request to the monolith in order to fetch additional user details.

Let’s assume the user account role is a student and we need to process the message for a live event. But the API call to fetch additional user details has failed. Since the consumer commit is asynchronous by default, the offset holding the concerned message will be marked as committed. Therefore, the message is not available for further retry when the API call succeeds in the future.

Asynchronous commit

Synchronous commit

The problem described in the asynchronous commit can be solved by using an asynchronous or manual commit. A manual commit is enabled by a simple configuration on Kafka consumer.

enable.auto.commit: False

Decide when and how to commit to your implementation. Look at the piece of code:

value = message_processor(data)if(some condition with respect to value):
consumer.commit(asynchronous=False)

message_processor(data) — a function that processes incoming messages in consumer.

Based on the return value from message_processor, you can decide to commit the message.

Successful Manual commit

If the condition is not satisfied, the message will not be marked as consumed and stays in the queue.

Failed Manual Commit

You can either send the message to the Dead Letter Queue (DLQ) and write a separate processing logic to process the DLQ or implement a retry mechanism where you could configure the number of retries. If the number of retries is exhausted, you can mark it as consumed.

In an asynchronous commit, the service has more control over the processing of incoming messages. We moved away from asynchronous commit for this very reason.

5. Data Idempotency

Duplication of events is common in Kafka since it guarantees at least one delivery.

Duplication scenarios and solutions

  1. If there’s a failure after sending a message to the broker and there’s no way to know if the message is already sent to the topic or not, the producer will retry so it might send the same message again.

Solution: It can be solved using the configuration on the producer.

enable.idempotence: True

2. Asynchronous commit increases data duplication. Since messages are committed periodically, there will be many messages in-flight. Processing of those bulk in-flight messages would have been completed. When the consumer group rebalance occurs, all in-flight messages will be reprocessed.

3. In a synchronous commit the duplication of messages can be reduced but not fully eradicated because of at least once delivery guarantee in Kafka. When a message is received by the consumer and the consumer has successfully processed the message, just before committing the message, consumer group rebalance can happen. During rebalance, offset reset gets kicked in.

auto.offset.reset: “earliest”

The offset reset is latest by default. All the recent messages will be processed again. If the offset reset is earliest, a message from offset 0 will be processed. This depends on the retention policy as well. If the retention policy is 7 days and the messages stay in the queue for 7 days, they will be deleted from the queue. In that case, only the recent messages will be available. Hence earliest and latest are equal. Either way, there will be data duplication.

Solution: It can be solved by handling it in the data layer of the application. You can maintain a unique identifier. For example, you can use a concatenation of key identifiers and creation date time as a field to differentiate between the new event and the duplicate event.

6. Maintain Event Versioning

When the product grows, the number of microservices grows and so does the number of subscribers. Irrespective of the event design pattern, a lot of details will need to be added based on the needs of the subscribers, making the need for Event schema more important.

Event schemas cannot be just a convention agreed by producer clients and subscribers. A schema can be enforced by the schema registry provided by confluent-kafka. If the producer sends the event that is not registered in the schema registry, the message produce will fail. This will keep the services communication intact with the cost of minor coupling.

Conclusion

In this blog, we discussed a use case in which Kafka is used to enable communication between the microservices, the challenges that presented themselves during message delivery from a producer to a broker, different commit strategies on a consumer, and how to mitigate data duplications on a service.

We also looked at various configurations to be used when producing and consuming a message. They are only a subset of configurations that the Confluent provides. You can find the comprehensive list of configurations at Confluent.

References:

  1. Kafka concepts and terminology: https://kafka.apache.org/documentation/#gettingStarted
  2. Python integration with Confluent Kafka: https://github.com/confluentinc/confluent-kafka-python
  3. Event design patterns:
    https://martinfowler.com/articles/201701-event-driven.html

--

--