How To – Events Infrastructure
By – Shashank Sah (Engineer, Platform)
UrbanClap is moving away from its monolithic to micro-service architecture. With this shift, a micro service might sometimes need data owned by other micro-services. If an RPC call is used each and every time to get the data, it will make the architecture bulky and repetitive.
To solve this problem, we moved away from a purely synchronous architecture. The new model uses a mixture of synchronous and asynchronous calls. In this article, we will focus on our asynchronous model employing the event-driven approach.
Events driven architecture helps to communicate between micro-services through messages or events. This is the asynchronous way to update localised view of micro-services. It provides better distribution and scalability.
We used Apache Kafka’s pub-sub model for the event streaming platform.
Key components of the Kafka cluster
Broker: Responsible for storing and maintaining the data published to the Kafka cluster.
Zookeeper: Zookeeper manages and co-ordinates Kafka brokers. It notifies producers and consumers for new broker additions or broker failures.
Topic: Topic is a category or feed name to which messages are published. Data is stored under a topic name.
Partition: Topics are split into partitions. Each partition is an ordered and immutable sequence of messages.
High-Level Architecture
The above architecture presents four major components:
- Kafka cluster
- Producers
- Consumers
- Monitoring, Alerting, and Reporting
Producer sends data to Kafka cluster, It persists the data and gives an acknowledgement back to the producer. The consumer group keeps polling kafka cluster for these events. Kafka’s distribution logic ensures that same event is not consumed by multiple consumers in a consumer group.
Each service can act as a producer or a consumer group or both. There is no direct connection between the producers and consumers, both can function independently without knowing the source or destination of an event.
Kafka cluster
Our Kafka cluster is running on three servers. Each server contains a broker and a zookeeper. For a higher availability and consistency of the cluster, the replication factor is configured as three and the minimum in-sync replicas is configured as two.
We have used Supervisor to watch broker and zookeeper processes for better availability. Given below are the broker Supervisor config files.
broker.conf
[program:broker]
command=/etc/supervisor/conf.d/broker.sh
autostart=true
autorestart=true
startretries=3
stderr_logfile=/var/log/kafka/supervisor/broker.err.log
stdout_logfile=/var/log/kafka/supervisor/broker.out.log
user=xxx
priority=20
environment=LOG_DIR=”/var/log/kafka/logs”
broker.sh
#!/bin/bash
trap “{bin/kafka-server-stop.sh config/server.properties; exit 0; }” EXIT
bin/kafka-server-start.sh config/server.properties
WARNING!! Ensure that all the kernel level flags and settings are tuned up, so that your Kafka cluster runs without system level failures. For example ulimit, which defines number of file descriptor for a process. It should be set to a higher value in production systems to avoid ‘not enough file descriptor’ failures.
You can increase the ulimit value with the below steps:
add this in below files: DefaultLimitNOFILE=102400
/etc/systemd/user.conf
/etc/systemd/system.confsudo systemctl daemon-reexec
sudo service supervisor restartcheck open files for a process: cat /proc/<pid>/limits
Event Producers
We use Docker containers to run our service. Each container is a producer and maintains a connection with the Kafka cluster. We have implemented it using kafka-node client library.
- Authorization: Check if the producer service is authorized to send events to the given topic.
- Schema validation: Validate events schema before sending to Kafka. This is done to make sure the event structure remains consistent throughout the applications.
- Generate payload and send an event: Generate payload (base64 encoded), set partition key (for ordering of events) and payload compression type. Send an event to Kafka broker and wait for the acknowledgement to mark it as a success.
- Send failure retry: If sending event fails for any reason, we log the payload data and save it in S3. We have a scheduled job which keeps polling for failed events file from s3 and replays them back to Kafka.
How to achieve strict ordering in events?
Since kafka is a distributed system, strict ordering of all messages running in multiple partitions is not guaranteed. There are two ways in which strict ordering can be achieved-
1) Use a single partition per topic.
2) Use key based ordering when using multiple partitions. A key is sent as part of the payload. Kafka puts all the messages with the same key on one partition.
Given below is the code to build payload and achieve key based ordering in kafka-node client:
function buildEventPayload(event, payload, partitionKey) {
payload = encodeString(JSON.stringify(payload));// Handle case where no partition key is provided
if(!partitionKey) {
partitionKey = uuidv4();
}
if(typeof partitionKey !== "string") {
partitionKey = JSON.stringify(partitionKey);
}
return Promise.resolve()
.then(function () {
return {
topic: event,
attributes: 2, // 2 is for snappy compression
messages: payload,
key: partitionKey
};
});
}
Event Consumers
We have consumer groups that consume events from a set of subscribed topics. A consumer group is a containerized micro-service. Each container in a micro-service represents a consumer.
A topic is stored in multiple partitions and all the topic events are distributed among them. Each consumer polls for events, based on the topic partitions it is listening to. Since each topic partition can map to a only a single consumer, an event cannot be consumed twice in the consumer group.
- Event listener: Kafka-node library continuously polls for events in batches. After fetching a batch of events, they are passed to the event listener. The event listener makes callback to event-handler function which is present in the consumer service.
- Event consumption rate limiter: Continuous polling of events leads to a high consumption rate during peak load. This leads to high resource consumption and impacts service performance. To limit the rate of event consumption, we used async queues to hold the events. Event is consumed from the queue by the rate(numEvents/sec) provided by the consumer client.
Initializing a kafka-node consumer with rate limiting:
this.init = function (serviceId, eventConfig, messageHandler, consumerOptions) {let topics = _.get(eventConfig, 'topicsToConsume.' + serviceId, null);
// Queue is required for rate limiting
initQueue(consumerOptions.eventProcessingWaitTimeMs, consumerOptions.concurrency);
consumer = new kafka.ConsumerGroup(consumerOptions, topics);consumer.on('message', function (message) {
if(queue) {
queue.push(message);
pauseConsumerFetch();
} else {
callEventHandler(message);
}
});function initQueue(eventProcessingWaitTimeMs, concurrency) {
if(eventProcessingWaitTimeMs && concurrency) {
queue = async.queue(function (message, callback) {
callEventHandler(message);
setTimeout(() => {callback()}, eventProcessingWaitTimeMs);
}, concurrency);// resume fetching from kafka cluster once the queue is empty
queue.drain = function() {
resumeConsumerFetch();
};
}
}function decodeString (text) {
return new Buffer(text, 'base64').toString('utf8');
}function callEventHandler(message) {
let payload = JSON.parse(decodeString(message.value));
messageHandler(message.topic, payload);let consumerLag;
if (message.timestamp) {
consumerLag = Date.now() - message.timestamp;
}
logger.info({
topic_name: message.topic,
event_type: 'consume',
producer_name: payload.producerId,
consumer_name: serviceId,
consumerLag: consumerLag,
offset: message.offset,
highWaterOffset: message.highWaterOffset,
key: message.key
});
}function pauseConsumerFetch() {
consumer.pause();
}function resumeConsumerFetch() {
consumer.resume();
}
};
Monitoring and Alerting
To monitor producer and consumer clients, we are using our centralized logging system made with Elastic stack. We run daily scripts on ElasticSearch to generate reports for our event system.
Kafka cluster monitoring commands
Describe topic:
bin/kafka-topics.sh — describe — zookeeper url:zookeeper_port — topic topic_nameList zookeepers topics:
bin/kafka-topics.sh — list — zookeeper url:zookeeper_portList of brokers:
bin/zookeeper-shell.sh url:zookeeper_port “ls /brokers/ids”Get consumer ips:
bin/kafka-consumer-groups.sh — describe — group consumer_group_name — bootstrap-server url:broker_portConsumer topic and partition assignment for consumer:
bin/kafka-consumer-groups.sh — describe — members — verbose — group consumer_group_name — bootstrap-server url:broker_portConsumer co-ordinaters and status:
bin/kafka-consumer-groups.sh — describe — group consumer_group_name — state — bootstrap-server url:broker_portGet consumer group list:
bin/kafka-consumer-groups.sh — list — bootstrap-server url:broker_port
Events driven architecture is an integral part of our Platform, each of our micro-service utilises this platform to publish messages which later gets consumed by other micro-services. As a next step, we are going to integrate it with our data platform to send backend transactional events which can be consumed for deeper data analysis.
References
About the author: Shashank Sah is part of the engineering team, working on our core Platform. This team solves for developer happiness and standardisation across engineering by building frameworks, tools, and working on infrastructure.
Sounds like fun? If you enjoyed this blog post, please clap 👏(as many times as you like) and follow us (UrbanClap Blogger) . Help us build a community by sharing on your favourite social networks (Twitter, LinkedIn, Facebook, etc).
You can read up more about us on our publications —
https://medium.com/urbanclap-design
https://medium.com/urbanclap-engineering
If you are interested in finding out about opportunities, visit us at http://careers.urbanclap.com