Optimizing Database Loads

Talha Khan
Hevo Data Engineering
7 min readFeb 22, 2022

We, at Hevo, support real-time, reliable, and loss-less Data Replication from myriads of Integrations to a bunch of sought-after Destinations. The Sources comprise Databases like MySQL, Postgres, MongoDB, DynamoDB, etc., and enterprise solutions like Salesforce, AppsFlyer, Mixpanel, Google Ads, Shopify, etc.

The Destinations include Warehouses like Redshift, Snowflake, BigQuery, Firebolt, etc., Data Lakes like Databricks, and Databases like MySQL, Postgres, SQL Server, and more. The list can be referred to here.

Over the past year, the number of customers has skyrocketed, and subsequently, the volume of data that is shipped has increased big-time. This has further translated into an increase in the number of users working with JDBC (DB) Destinations.

In this article, we will focus on the approach we follow at Hevo to load data to the JDBC Destinations and the recent optimizations which helped us fix most of our optimization and isolation problems. As mentioned, we’ll dive deep into the following:

  1. Few Contextual Terms which will be used in the article
  2. Old Procedure to Load Data to JDBC Destinations
  3. Problems in the Old Design
  4. The solution Adopted to Address those Problems
  5. Limitations of the New Solution
  6. Results

What are JDBC Destinations?

Typically Database Destinations are called JDBC Destinations. A few examples include MySQL, Postgres, Microsoft SQL Server, etc.

Requirements

  1. Faster processing/loading of Events.
  2. Isolation of Data Loading across customers in terms of performance.

Key Terms

  1. Task: It represents a fundamental unit of execution. Tasks have been explained in great detail here. Do have a look!
  2. Destination Speed: Inverse of the average time taken to load a record in the Destination over a period of time. This is calculated as follows:
    1 / ((Time taken to load a batch of Records / Number of Records) grouped over 5 mins, for the last 15 Minutes)
  3. Slow Destination: If the Destination speed is slower than the threshold, which has been kept at 1/(50ms), the Destination is considered as “Slow”.
  4. Poorly Performing Topics: We use Kafka as an asynchronous buffering system and create a topic per customer (this will be further discussed in the Design section). A Kafka Topic is considered as poorly performing if it would take greater than a threshold time (6hrs) to consume all the messages, based on the current Kafka Lag and the Destination speed for the customer.

For simplicity, we will assume Source and Destination are both Databases.

Old Design For JDBC Destination Load

  1. Topic Creation: When the customers are created, a Topic in Kafka is created. All the data for the customer streams through this Topic, hence the isolation and the throughput basis, the partitions.
  2. Ingestion: Tasks run for each table in the Pipeline and push the data polled from the source to Kafka with the partition key as the Primary Key (to maintain ordering).
  3. Consumers: The Topics are grouped into Clusters. For each Cluster, a Kafka Consumer Group is created. This means one Consumer Group consumes data from multiple Topics, hence, multiple customers. Kafka Consumers poll records containing data from different source tables, separate into batches per Destination Table, followed by sinking the deduped data into the Destination for each table.

Problems in the Old Design

  1. Since the Primary Key acts as the Partition Key, each partition contains data from different tables. Hence, the Consumer creates the batches per table and executes the load, which might result in creating many connections and loading a skewed number of records per table, which is slow.
  2. Another challenge is that one Kafka Consumer Group is linked to multiple Topics, which essentially means different customers. Now, each customer can have a Destination with distinct processing power and hence, different Destination Speeds. Kafka Consumer does a round-robin to load data, which essentially can slow down all the customers in that Topic Cluster because of one customer with a “Slow Destination”.

Existing solutions to the above challenges are not scalable and don’t solve the problem at the root level.

  1. To circumvent the problem, we have been moving theSlow Destination” to a different Consumer Group programmatically to avoid impacting the customers having Fast Destinations. So, Slow Destinations are grouped together.
  2. Beyond a threshold, we sideline the Events to avoid data loss owing to Kafka’s Retention Breach.
  3. In some cases, if the Ingestion for a customer is high and the Destination is slow, we move that customer to a separate Topic Cluster altogether to completely isolate the consumption for the customer.

Solutions

With the above problems in mind, we discussed and deliberated upon various approaches. Discussing one of the initial ones we had in mind here:

Change in the Partition Key
We thought of keeping the table name as the Primary Key. This would restrict the skewness; the data for a customer will be present together in a partition. Hence, the number of connections and queries to the Destination would be much less.

However, this approach comes with 3 problems:

  1. Unused Resources: The partitions catering to tables in which Ingestion is less will remain empty most of the time, and hence, the consumer tagged to it will also be wasteful.
  2. Single Partition Bottleneck: If there is high Ingestion in a table, the consumer tagged to the corresponding partition might not be able to match the Ingestion Throughput, and hence, there would be a Lag, which can result in data loss.
  3. Destination, Still Slow: If the Destination does not perform well, this would still circle us back to the original problem.

Considering the challenges this approach would bring to the table, we finally went ahead with a File-based Approach.

Our New Design for JDBC Loads

As the old design works well for Fast Destinations, we continue to operate the same way by loading the data straight from the Consumer to the Destination and moving to a File-based approach on demand when the processing time based on Lag and Destination Speed crosses a particular threshold.

  1. The File-based approach is basically writing the Records to files on the node itself instead of writing those to the Destination. The Records are written from the Consumer per Destination Table, which is extremely fast since it’s just disk writes.
  2. The files created on the node are then uploaded to AWS S3, and bookkeeping for the same is done in MySQL. There are provisions to handle downscaling, restarts, etc., to ensure the files are always closed and uploaded to AWS S3 before the node goes down.
  3. There is a separate Task per Destination Table, which runs every minute to download the files from AWS S3 and load the Records to the Destination.

Complete Isolation
This completely avoids the 2nd problem mentioned above, which is one customer with a “Slow Destination” impacting other customers belonging to the same Topic Cluster since the consumer writes to files, which is independent of the Destination in play.

Higher Throughput
This also solves the first problem to a great extent as the number of Records batched together would be higher now. Hence, we would be writing in a large chunk to the Destination, avoiding multiple connection creations and reducing the queries run on the Destination. This also reduces the load on the customer’s Destination.

Key Results Achieved

The Load Throughput after the design change has increased significantly, and there is complete isolation now between the customers in terms of adverse effects on other customers.

Load Throughput: Before and After the Design Change

Limitations of the New Design

  1. The jobs executing the loads run aggressively to keep the Ingestion to load Latency minimal and hence, async in its true sense. This is particularly different from the old design, where as soon as the Consumer gets the data, it tries to load it and gives the appearance of in-sync load. This is applicable only when the Ingestion is not too high, and the Destination is fast since otherwise there would be a Lag in the old design as well.
  2. Since we create smaller files and upload them to AWS S3, there is an infra cost associated with which we are okay.
  3. If the Ingestion pattern is high Ingestion in only a few tables (<10), the parallelism for a table is lost since only 1 load job runs per table. This has been circumvented by introducing Controlled Parallelism for the costly operations like temp table insertion etc., in the load job itself.

What’s Lies Ahead

  1. Using RocksDB for BookKeeping: Instead of using MySQL, we can use RocksDB for bookkeeping, since it’s anyways local to the node and using MySQL only in special cases when there is a downscaling of node, or there is an error in Destination loads.
  2. Local Aggregation: Using S3-based flow only as a fallback for downscaling, errors, etc., and relying on node-level aggregation and sinking for most use cases. This would help evade the infra cost.
  3. Historical Loads Batch Mode: Enabling the batch mode for Historical Loads based on certain conditions, since the Ingestion Rate generally remains high, and if the Destination is slow, it hampers the performance significantly.

Thank you for reading the post till the end. Please write to us at dev@hevodata.com with your comments and suggestions.

--

--