Designing a Robust and Scalable Kafka Integration

Pavan Kalyan
Hevo Data Engineering
8 min readApr 16, 2021

Hevo is a no-code data pipeline as a service. It allows customers to use pre-built integrations to replicate their data to their warehouses. One such integration Hevo provides is Apache Kafka. It is an open-source distributed event streaming platform with a variety of use cases. We at Hevo, use Kafka internally as an event streaming architecture and a reliable pub-sub system.

There are various challenges in building a scalable and robust Kafka integration.

Challenges

  • Scalability: At Hevo, we have seen a meteoric rise in customers signing up for our product in the last few months and we needed to design our product to accommodate a huge number of customers with varying amounts of data. The Kafka integration should be able to scale from customers with a few Topics with a small amount of data to customers with a lot of data across a lot of Topics with many Partitions each.
  • Autonomy: The Kafka integration should be able to adjust itself according to different customer requirements without any human intervention.

Topic: Logical unit of storage; equivalent to a table in traditional relational databases

Partition: Logical splits of the data in a Topic; similar to shards of tables in traditional relational databases

Design Plan

An important prerequisite to understanding this design is to understand what we mean by ‘Job’.

For every integration at Hevo, the Job is the smallest unit of schedulable work that ingests data from sources. For e.g., in the MySQL integration, for every table, we create a Job whose sole purpose is to extract data from that table in the source. This Job is schedulable with a frequency. At a given time, a Job does not run for more than an hour continuously.

The first question in designing the Kafka integration is to understand how the Topics in the source map to our Jobs. There are a few obvious mappings that come to mind.

Different mapping schemes between Topic Partitions and Jobs

i) Here each Job is mapped to all the Topic Partitions in one Topic in Kafka, no matter how many partitions there are. This means that if Topic 2 gets a huge amount of data, we would be assigning it only one Job. That would increase the latency of processing data from that Topic.

ii) In this case each Job is mapped to a single Topic Partition. This would be the most granular mapping and would allow us to handle the situation when a single Topic has a huge amount of data by dedicating a separate Job for each Topic Partition. But, if there is no data or a small amount of data in Topic 1, we would be unnecessarily wasting resources on Topic Partitions with no data; resources that could have been put to better use ingesting data from Topics with a lot more data.

iii) In this case, we see that there is no strict or static mapping between a Topic Partition and a Job. Job 0 is assigned to all the Topic Partitions in Topic 1, while Job 1 is assigned to 2 partitions in Topic 2 and Job 2 is assigned to Topic 2 Partition 1. This is done to account for the huge amount of data in Topic 2. But, since there is very little data in Topic 1, having a single Job for all the partitions in Topic 1 is sufficient.

So, to support a huge number of customers with varying amounts of data, option (iii) seems like a no-brainer. The next (and harder) step is to actually design the system to support this plan.

System Design

Lag Calculation

To support a data-load-aware dynamic integration requires us to first determine which Topics have a lot of data.
To determine this, we use Lag as a metric.

The formula for calculating lag for a particular topic

Where offset is the offset of the Topic Partition. We determine the offset of the latest record on Kafka by using Kafka’s Admin API and the current offset would be stored with us and updated each time a Job ran and pulled data from Kafka.

Every time the pipeline runs, we would need to calculate this lag and store it. This lag data is relatively short-lived. We use Redis to store this data.

Scaling Evaluator

After every few runs of the pipeline, we evaluate whether we should increase or decrease the number of Jobs for a Topic. To evaluate this, we use a combination of linear regression and min-max thresholds. All of the above are configurable for any individual pipeline.

i) If there is a lot of data and it’s steadily increasing, we increase the number of jobs assigned to the Topic. We determine the scaling factor by checking the retention period set for the Topic. If the Topic has a very short retention period, we increase the number of Jobs by a lot. Conversely, if the Topic has a very long retention period, we increase the number of Jobs by a very small number.

Scaling Factor: The number of extra Jobs that need to be created. (i.e. by how many Jobs we should increase or decrease)

ii) If there is a very small amount of data or it’s sharply decreasing, we decrease the number of jobs assigned to the Topic by the aforementioned scaling factor. Here as well the Topic’s retention period helps us determine the scaling factor. If it’s short, decrease by a few Jobs; if it’s long, decrease by a lot.

Once we evaluate this, each Job will be told to pick up a certain number of Topic Partitions for their respective assigned Topics. When any scaling in or out happens, these values are updated in each Job’s meta in the database. On the next run, the Job will pick up the updated number of Topic Partitions.

But what about already running Jobs? To make the system immediately reflect the new state, the already running Jobs are alerted to this change and they immediately store their offsets and finish running, so they can run again with the updated number of Topic Partitions. To achieve this as fast as possible, we use Redis as Pub-Sub. This has milliseconds latency in our networks.

Evaluator node alerts all other nodes about scaling changes for a particular Topic via Redis

So far, we have discussed how the Jobs are mapped, how scaling is calculated, and how it’s reflected in a multi-node system. But, we haven’t yet discussed how the Jobs actually pick Topic Partitions to pull data for.

Topic Partition and Job Allocation Design

At Hevo, we use MySQL 5.7 as our database. We first need to persist the different Topic Partitions that each integration needs to poll data for. We do this in a separate table. On each run of the Job, it first picks a certain pre-decided number of Topic Partitions from its assigned Topic and then starts polling data for it. To ensure that our integration is fault-tolerant, each Job at the end of its run will disown the picked-up Topic Partitions, whether there is no more data available or the Job has exceeded its 1 hour of execution time.

This means that we would be using the MySQL table as a queue. Let’s see how we can achieve this using MySQL’s in-built features.

Schema

+------------+--------------------------------------------------+
| Column | Description |
+------------+--------------------------------------------------+
| topic | Kafka Topic name |
| partition | Kafka Topic Partition |
| offset | Offset for this Topic Partition |
| job_id | Currently claimed by this Job, null if unclaimed |
| updated_ts | Last time the row was updated |
+------------+--------------------------------------------------+

There are primarily 2 ways for each Job to pick up multiple Topic Partitions without conflicting with each other.

i) Optimistic Locking

Optimistic locking

This is where the Job first retrieves n Topic Partitions from the table and then when updating the job_id column, it first checks if the updated_ts has changed since the rows were retrieved. This helps determine whether the Topic Partition has been claimed by any other Job and will discard the retrieved rows and immediately retry with n different Topic Partitions.

Pros: Useful when the chance of locking is low.

Cons: Requires multiple DB queries and not very efficient since there will need to be multiple retries.

ii) Pessimistic Locking

Pessimistic locking

This is where the Job runs a blocking query in the DB to select, update n rows with its job_id and return these Topic Partitions. This is supported using MySQL’s SELECT FOR UPDATE query

Pros: Single query only and efficient for many rows at once.

Cons: Locking query without indexed fields can lock the table which can cause deadlocks and will need to be retried when detected.

We ended up using Pessimistic locking via SELECT FOR UPDATE because it’s more efficient and we have various ways to mitigate the locking of rows.

There are variations of this query that are useful; MySQL 8.0 supports SELECT FOR UPDATE ... SKIP LOCKED . This query directly supports using a MySQL table as a queue, but it required MySQL 8.0 and we are on 5.7. The upgrade is non-trivial.

To alleviate the potential locking of multiple Jobs trying to claim (pick) Topic Partitions, we indexed the topicand partition columns. This will reduce the chances of deadlocks and since this would be an insert-light table, this was an acceptable cost.

Multiple Jobs trying to claim different Topic Partitions. This can get hectic, so a strategy to mitigate locking is helpful

Note: While picking Topic Partitions, we sort by last_picked_ts to ensure starvation is not possible.

Fine Tuning

At Hevo, different customers have different source servers set up, each having its own little quirks. To account for this, we have different configuration parameters that can be utilized to fine-tune a pipeline according to the customer’s unique situation. This has been upheld (and taken further) for Kafka by allowing configuration of various settings like ‘max records per poll’, ‘poll time out’, and different scaling thresholds, etc.

Simulated Results

Assuming each Job can ingest 50k records in 1 hour and simulating an increasing amount of data for a particular Topic, we can see that as the Jobs increase the lag stabilizes.

Further Improvements

  1. Further optimization on the Topic Partition selection logic can be done, where instead of sorting by last picked timestamp, we can use data to determine which Topic Partitions historically have the most data and prioritize picking those. This would also require an aging logic to avoid starvation of Topic Partitions that see very little activity. This would allow us to shave off time used to queue Topic Partitions with no data.
  2. Move this away from the main application to make this available as an independent service that can be used to dog-food Hevo’s consumer scaling as well.

The world of data is changing and is never going to be the same, if you think it is worth being a part of our mission and work on challenges like this, give us a buzz at 100guns@hevodata.com.

--

--