How our data scientists' petabytes of data is ingested into Hadoop (from Kafka)

Writing a trillion messages a day can create a lot of challenges (& files)

Akshesh Doshi
Agoda Engineering & Design
9 min readSep 10, 2021

--

In Agoda, data is heavily used to make informed decisions. This allows us to plan the best possible trip itineraries for our customers, whether it is by fetching the best deals on their hotels or the next feature that we’d like to launch.

The underlying system used to generate these insights is Hadoop, with Kafka being one of its major ingestion data sources.

Hence, we need a very reliable tool for this process.

Earlier, this ingestion was done using the Map-Reduce-based open-source tool Camus. But slowly, as our scale grew to ingest terabytes of data with around a trillion messages per day and our users asking for lower ingest latencies, we knew that it was time to create a new system with a focus on better manageability, reliability, scalability and low latency.

Hence, we embarked on a development journey to overcome the cracks we started to see in Camus. Now, after watching Hadin run successfully on production for over a year, we would like to share our design, learnings, and decisions from building our new system — Hadin (Hadoop Data Ingestion)!

Motivation, why not continue with Camus?

Camus served us well for many years. Simply put, it was a Hadoop Map-Reduce job scheduled to run periodically (i.e., at a fixed interval of time).

But as our use cases and data size grew, we started experiencing some shortcomings with this architecture:

  1. High latency in data ingestion — Here, we define “latency” as the time between when an application writes data to Kafka and when it gets written to Hadoop. With a scheduled frequency of 10 minutes and accounting for some time for the job to run, we would see the p99 of our data latency to be 15 minutes in the “best-case scenario.” In case of a failure in one particular run, this meant that the latency would reach 30 minutes. When things went south, by the time a developer got paged and fixed the issue, we would start hitting the territory of p99, going up to 45 minutes or more! (We were trying to maintain an SLA of 30 minutes)
  2. Technical debt — Agoda was widely adopting Spark for its ETL requirements due to its in-memory processing. We also wanted to move away from MR’s disk-based processing to achieve better performance. At one point, our Hadoop data ingestion was the only job in our Hadoop cluster that was still based on Map Reduce 😬. Moreover, Camus had also been deprecated in favor of Gobblin, which meant that the burden of maintaining Camus was solely on us. We did not want to migrate to Gobblin because it was still MR-based. LinkedIn did share about FastIngest recently, but we were already running Hadin in production.

Other than the above deficiencies, Camus had some other hiccups which were impacting us more often as our data grew:

  1. Camus manages Kafka offsets by itself — This meant that we would need something to monitor the offset & accuracy of each partition if one particular partition failed. Alternatively, to avoid this, we decided to let the whole job fail if a Kafka partition had a problem. This meant that even if one partition from a particular topic failed, say because of a corrupt message, and it would impact the ingestion of all other unrelated & completely innocent topics’ partitions. With thousands of partitions being ingested from our Kafka clusters, the chances of this issue biting us were getting higher. A similar problem would arise if there were one bad node in the Hadoop cluster because the failure of one mapper would also fail the whole job.
  2. No dynamic workload allocation — Camus’ workload distribution strategy was to assign equal partitions to each mapper job without considering any sudden spike in a particular partition or different incoming byte rates of different topics. This sometimes led to major skews in the mappers, significantly increasing the duration of a particular batch (while other mappers had already completed). Also, there was no auto-scaling feature out of the box, which could throw more compute power to the system in case of an increase in incoming load.
  3. Robust coupling with Hadoop — Other than the strong dependency on Hadoop MR+Yarn, Camus also used HDFS to persist some metadata (like Kafka offsets) and application configs. With the advent of architectures like object storage (to solve use-cases served by HDFS) and technologies like Kubernetes (an alternative to Yarn), we did not want to lock ourselves in with the Hadoop ecosystem. This flexibility would allow us to experiment with more modern storage & resource management layers greatly in the future.
  4. Furthermore, relying on a scheduled batch job added another dependency on a scheduling system, which was Oozie in our case. Removing dependency from an external system was, anyways, an exciting idea for us, and Oozie’s complicated execution/configs only motivated us further in that direction.

Consequently, we started developing Hadin to serve as our next-generation solution for the ingestion component of the data pipeline.

Hadin — The Architecture

The Hadin System

There are various components in this system. Let’s go through them one by one:

The data source & sink

In our case, the source is Avro data from (our internal flavor) Apache Kafka, and the sink is data being written in Avro & Parquet format into Hadoop.

The auditing system

The part of our auditing system used by Hadin is the information about the rate of data flow in each Kafka topic in bytes/sec in near real-time. We store our audits in Elasticsearch, and a brief explanation of how it works has been explained in this article. The controller uses this information to divide tasks amongst workers, as described later.

Hadin

Worker
The worker component of Hadin is responsible for reading the data from Kafka and writing it to Hadoop. A worker can be assigned multiple partitions to read from and write to different corresponding directories in Hadoop.

Controller
The controller component of Hadin is responsible for calculating which Kafka partition(s) should be assigned to which worker. It calculates this based on various factors like the number of partitions to divide, the rate of data flow through them, the number of workers alive, etc. — this will be explained later when we talk about the features.

Helix

It is the glue through which the controller talks to the worker & workers report their liveliness. Internally it uses Zookeeper. The controller creates an ideal state & writes it to ZK — then Helix makes sure to signal the minimal transitions required to change the current state to the ideal one.

Container orchestrating system (currently in Docker Swarm)

It is a pool of workers. Responsible for distributing the workers evenly on actual physical nodes and spawning back workers if they fail (self-healing). Hadin is deployed as a Docker Swarm stack (yml) file, and both controller & workers run as Docker containers.

Hadin — Features

Stream-based ingestion (via long-running worker process)

Unlike its ancestor, Hadin is a long-running process. The workers subscribe to Kafka partitions via the Kafka consumer API, as per the controller’s plan.

This allows us to ingest the data in near real-time and reduce our latency significantly. Going from 45 minutes to near real-time was a huge win for us (although we intentionally kept the latency to ~10 minutes which is explained later).

Workload distribution amongst workers

The controller, as explained earlier, creates a workload distribution strategy that governs what partition would be consumed by which worker. The controller computes these ideal states based on a couple of things:

Data inflow byte rate
With tens of thousands of partitions in our Kafka clusters that need to be consumed by Hadin, we could not just bluntly divide the partitions equally among the workers.

Instead, the controller divides the partitions amongst workers based on their collective byte rates (bytes/s). This is illustrated in the image below:

Load distribution among workers is based on incoming data (instead of # of partitions)

Consumer lag
The load distribution based on incoming data rate works well for us in most of the scenarios. But in some cases, like when there is a momentary spike in the incoming data of a particular partition, the real-time incoming rate might not represent the accurate picture and hence the partition’s lag might grow significantly.
Another scenario when the lag of a particular partition might increase substantially is when a corrupt message might creep into a Kafka partition and the corresponding Hadin worker cannot deserialize/process it, until someone manually resets offsets to skip that message.

For such scenarios, the controller also considers the Kafka consumer lag while planning the ideal state and dedicates a whole worker for that particular partition until its lag comes back to normal.

A worker is dedicated to a particular partition if its consumer lag crosses a certain (configurable) threshold.

Dockerized components (easy scaling with container orchestrators)

We can easily scale up or down in real-time with just a click of a button with Docker Swarm, although this can be done with any container orchestrator like Kubernetes or Yarn.

Our deployment also got rid of one of the largest Hadoop jobs, giving resources back to real processing work and also giving us the ability of independent capacity planning for ingestion which is more linked to responsibilities of the data pipeline team than the Hadoop team.

Additionally, with more fine-grained resource control and much more optimized code, we use fewer resources now than Camus and still catch up lag much faster.

Write flush (latency) configuration.

Hadin creates a new file in Hadoop every time it commits to Kafka to avoid data loss. This means how frequently we commit to Kafka also decides how many messages are written to each file, which affects the size of the file.

So to save our Hadoop cluster from the small files problem, we consume messages from Kafka and buffer them in memory before flushing them to Hadoop.

This is achieved with a configurable time interval until which the workers keep buffering Kafka data into memory before writing it to Hadoop. We have set this value to 5 minutes as this latency is acceptable for our use cases.

To avoid unnecessary flush latencies for large topics, we also flush data based on the size of the data already collected in memory. So if there is already enough data to create a file bigger than a Hadoop block size, the workers would flush to Hadoop immediately instead of waiting for 5 minutes.

This way, partitions with a high volume of incoming data do not get penalized for Hadoop’s small files problem and see a relatively smaller latency.

Write file format

Although most of the use-cases served by our Hadoop cluster leverage a columnar storage format like Parquet, some teams require the data to be stored in Avro. Consequently, Hadin workers support a configuration to specify which file format the data should be written in Hadoop.

On the read side, all our data in Kafka (consumed by Hadin) is in Avro, and the workers interact with the Schema Registry to deserialize this data for writing into the output file.

Our schema registry is a *heavily* modified version of Confluent’s Schema Registry and deserves a complete article of its own.

Future enhancements

  1. Hadin currently uses Zookeeper for communication between the controller and workers via Helix. Fortunately, we already had Zookeeper clusters for our Kafka clusters, so we could re-use them without adding any operational costs. But with KIP-500 now merged into Kafka, we would try to get rid of Zookeeper altogether, requiring us to make changes to Hadin’s controller-worker communication.
  2. We want to use the time-based lag estimations instead of basing them on the number of messages for our lag-based workload distribution strategy.

Conclusion

After more than a year of running Hadin in production, we are happy to see the results. The time for messages to reach our Hadoop data warehouse has decreased, significantly improving the reliability.

Furthermore, we are no longer tied to Hadoop’s ecosystem for our new ingestion system to work.

Hadin was built in Agoda as an initiative by the Messaging team. If this sounded interesting to you and you would like to help us build & improve our data platform, we would love to talk to you via this opening.

Acknowledgements

A big thanks goes to many other open-source tools (most notably Uber’s uReplicator) for sharing their experiences & architectures of similar solutions, which helped us make the right decisions for Hadin from the beginning.

Thanks to Johan Lundahl for helping me out with this article.

Join the team

careersatagoda.com

Recommended reading:

👉🏻 Adding Time Lag to Monitor Kafka Consumer
👉🏻 How Agoda manages 1.5 Trillion Events per day on Kafka

A podcast on how the whole platform looks like, which Hadin is a part of — https://www.firebolt.io/blog/how-did-agoda-scale-its-data-platform-to-support-1-5t-events-per-day

--

--