Infra optimization by templatization and Kafka

Puneetjain
PlaySimple Engineering
6 min readDec 11, 2023

Storytime, Once upon a time, we had a daily task of running 400–500 Python scripts. These scripts, like us, had repetitive tasks to perform.

  1. Querying required information by joining multiple tables from the PySpark database
  2. Perform decryption (un-hashing) of some sensitive information.
  3. Send this information to multiple partners using two main approaches: uploading data to an S3 bucket and making individual API requests for each data point to be sent.

Now, managing this infrastructure posed significant challenges.

a. One major issue was the need to join multiple large tables to obtain the final result. Consequently, we were unable to execute more than 6–7 scripts in parallel due to the risk of overwhelming the system’s memory and CPU.

b. Another limitation we encountered was the maximum number of API requests we could send concurrently. Even if our infrastructure could handle a high volume of parallel scripts, we would receive external API failures due to this restriction.

c. Considering the overall process involved approximately 2–3 million API requests, the time spent on sending data outweighed the actual query execution time. This resulted in wasted computing resources.

d. It was crucial to ensure that each data point was sent only once. Unlike S3 scripts, where data uploads either succeeded or failed entirely, API scripts lacked a mechanism to identify previously sent events in the event of system failure. Resending such events risked data corruption.

e. It was getting increasingly difficult to manage these high amounts of scripts and one change would be required to be replicated across multiple scripts making it repetitive.

So let’s jump to how we broke down the problem and solved it.

Overall Infra Changes

Part-1: The heavy query problem

As we analysed all the queries, we noticed that these queries could be categorised into just 3–4 distinct types where each one involved joining the same tables and selecting a subset of data based on a few predetermined parameters such as ID or platform.

So our solution — Pre-computation (Query Templatization).

Instead of each query performing the joins themselves, we precomputed the results of the entire query into a single table This table included additional columns representing the subset selection parameters.

Consequently, individual scripts only needed to select the relevant data from the pre-computed table based on the subset columns, rather than executing complex joins for each query. This approach significantly simplified and expedited the query execution process.

This resulted in individual queries changing from:-

Query Change

Results:-

By implementing precomputation and query templatization, we reduced individual query runtime from 1.5–2 minutes to less than 10 seconds. Factoring in the amount of scripts we have, this saved hours of computation time, improved resource utilisation, and reduced the need for large memory usage.

Overall it helped us decrease the script runtime by 1.5 hours.

Part 2- High Script Count problem

Building upon the concept of query templatization, we realised that the scripts themselves could be categorised into subsets based on differing Python parameters. These parameters determined factors such as the API for data upload, the S3 location utilised, or the type of data un-hashing.

Now these 400 scripts could be categorised into 6 templates. Where each template followed a completely different structure. These templates will take some arguments as input and perform the required tasks.

Instead of modifying each of the 400 scripts individually, we converted them into JSON objects that specified parameters and templates to be used. By updating the templates, we achieved efficient script management without repetitive modifications.

And that is how folks you solve a problem. But still, we have more:-

Part3: The sending overload problem

Now we are done with processing the data at our end, the last issue remains is sending the data to these external partners without any issues/duplicacy from our side and considering not overloading the external partners with too many parallel requests.

The S3-type scripts were sorted, so let's move to the challenging part, API-type scripts. These scripts required handling 3–5 million API requests daily, managing parallelism, and adhering to rate limits on a partner level. Additionally, we needed to implement monitoring for failed requests and ensure that duplicate API requests were avoided. So we went to explore a solution that could fulfil all of our requirements.

Soon enough, we found that a central Queueing technology would be ideal for our use case.

We designed all the scripts to send their complete data to a central queue. Subsequently, the queue would handle the gradual sending of these API requests to external partners. This way we need not keep the big querying compute running the whole time as well saving us resources as well. Brilliant!

So we explored various queueing technologies and finally decided to go ahead with Kafka, specifically AWS MSK for our use case.

There are 3 main parts:-

  1. Producer: The entity that would send data to Kafka. This would be responsible for sending all the URLs to Kafka.
  2. Consumer: The entity that would read the data from the queue. This would be responsible for actually sending data to external partners.
  3. Kafka: This is basically a store where all the data would be present. This manages all the producers and consumers.

Kafka had a lot of features that would help us solve our use case in a resilient manner.

  1. Concept of topics: Topics can be understood as subjects or groups which help in the segregation of data, producers would send data to a particular topic, while consumers could also read from these specific topics only. This feature was advantageous for us as we could create topics on a partner level, allowing us to segregate the data sent to each partner effectively.
  2. Transaction-based producer: Writing data in the form of transactions allowed us to roll back the entire transaction in the event of a producer failure. This prevented data duplication while writing to the queue in case of producer failures.
  3. Efficiency: Kafka demonstrated high message throughput, enabling us to write 1 million messages in 2–3 minutes, which was highly suitable for our use case in terms of efficiency.
  4. Resilient consumers: Kafka stores read receipt indices within the queue, ensuring consumer resilience. If a consumer fails, its offset is preserved in the queue. When a new consumer takes over, it seamlessly continues reading from the previous offset.

Moving forward, we introduced a generalized approach for data upload. Individual scripts were responsible for uploading a file to a designated S3 path. These files contain all the URLs that are to be sent to the queue. This takes just a few seconds due to the high efficiency of S3.

We did not write data directly from scripts to Kafka as a high number of parallel producers to Kafka could result in an overwhelming Kafka cluster and the script cluster as well. So we uploaded it to S3 where we can send it in planned small chunks.

For each uploaded file, we implemented a producer script to read and send the individual events to the queue in a record-by-record manner. Leveraging the efficiency of Kafka, this process was completed within a few minutes.

Once all the events from a specific file are successfully sent to the queue, we mark the transaction as complete. At this point, the messages (URLs) become visible to consumers, which initiates the process of reading these URLs and sending the corresponding data to external partners.

Final Flow

By doing all this, we were able to decrease the overall runtime from 4 hours to ~1 hour for all the scripts. The current infra is also highly scalable, allowing us to control the number of consumers created at the partner level, thus enabling us to manage and adjust the throughput accordingly.

Overall after all the changes, we now have an infra that handles resources efficiently, improves manageability and runs in acceptable ETAs.

--

--