Scaling Spark Streaming for Logging Event Ingestion
How Airbnb scaled Spark streaming with a novel balanced Kafka reader that can ingest massive amount of logging events from Kafka in near real-time
Logging Event Ingestion at Airbnb
Logging events are emitted from clients (such as mobile apps and web browser) and online services with key information and context about the actions or operations. Each event carries a specific piece of information. For example, when a guest searches for a beach house in Malibu on Airbnb.com, a search event containing the location, checkin and checkout dates, etc. would be generated (and anonymized for privacy protection).
At Airbnb, event logging is crucial for us to understand guests and hosts and then provide them with a better experience. It informs decision-making in business and drives product development in engineering functions such as Search, Experimentation, Payments, etc. As an example, logging events are a major source for training machine learning models for search ranking of listings.
Logging events are ingested into the data warehouse in near real-time and serve as a source for many ETL and analytics jobs. Events are published to Kafka from clients and services. A Spark streaming job (built on top of Airstream, Airbnb’s streaming processing framework) continuously reads from Kafka and writes the events to HBase for deduplication. Finally, events are dumped from HBase into a Hive table hourly. Since the logging events are input to many pipelines and power numerous dashboards across the entire company, it is utterly important to make sure they land in the data warehouse in a timely fashion and meet the SLAs.
The volume of events generated is massive and rapidly increasing. That poses serious challenges to existing ingestion infrastructure, particularly the Spark streaming job that ingests events from Kafka to HBase. In this article, we discuss the challenges in scaling the infrastructure and a solution that can support higher throughput by an order of magnitude and with better efficiency.
1. Spark Parallelism is Determined by the Number of Kafka Partitions
In the current Spark Kafka connector, there is a one-to-one correspondence between Kafka partitions and Spark tasks. Basically, one Spark task is instantiated to read from one Kafka partition to ensure the ordering of events when they are processed in Spark. However, with this design, we cannot simply scale the Spark streaming job by increasing the parallelism and allocating more resources.
To increase the Spark parallelism and throughput, one has to assign more Kafka partitions to topics with large events or high QPS events. Unfortunately, that is a rather manual process and is not scalable when there are a large number of topics (which keeps increasing).
Another problem is that assigning more partitions to a topic in Kafka does not retrospectively apply to events that are already in Kafka. The additional partitions are only available to new events. It is impractical for us to anticipate spike of events and assign more partitions beforehand to the affected Kafka topic. A spike could come any time and could be due to various reasons, such as new product features or holidays.
When event volume reaches a critical level, large Kafka topics often could not be ingested fast enough to the data warehouse. The problem is exacerbated by data skew in the events which we will discuss next.
2. Skew in Event Volume and Size
Different event types are being logged with significant variations in their volume and size. Some are really sparse and some may have a QPS that’s several orders of magnitude higher. The sizes of event types could range from hundreds of bytes to hundreds of kilobytes. The box plot below shows the large variation of average event size for the Kafka topics (note that Y axis is in log scale). Although we try to assign more partitions for larger events, there is still serious skew in the Kafka partitions.
Skew is a serious problem in general for data applications. In this case, some Spark tasks would take much longer to finish than others. It leads to idling of many executors and waste of resources since a Spark job moves on to next stage when all tasks in a stage finish. Kafka partitions with the largest events would take an unreasonably long time to read if the topic does not have enough partitions. This results in lag in the Spark streaming job since batches are processed sequentially.
3. Near Real-time Ingestion and Headroom for Catch Up
Due to the challenges above, there is little headroom in the throughput of the Spark streaming job. Once the job is delayed due to various problems (like bad data nodes or Hive Metastore outage), it takes a really long time to catch up.
For example, let’s assume a job with 2-minute interval processes a batch in 1 minute on average. If the job is lagging behind for 4 hours, it would take another 4 hours to catch up. If we want it to catch up in 1 hour, that requires 4X headroom (i.e., process each batch in 24s). Apart from recovering from incidents, large headroom is also necessary to handle seasonal spikes. Therefore, for near real-time ingestion, it is crucial to have extra headroom in throughput.
In an ideal system, we would like to be able to horizontally scale out the Spark streaming jobs (i.e., achieving higher throughput by increasing parallelism and allocating more resources). We would also like these jobs to be load balanced so each task takes a roughly equal amount of time to read from Kafka.
To achieve these two goals, we at the Airbnb Data Platform team developed a balanced Spark Kafka reader that satisfies those two requirements.
Balanced Spark Kafka Reader
For streaming ingestion, the ordering of events is not a requirement since ingested events are processed minimally and then stored in HBase individually. This allows us to re-think the model and look for novel ways to address the scaling issues. As a result, we created a new balanced Kafka reader for Spark that 1) allows arbitrary number of splits, so the parallelism can be increased to provide higher throughput; 2) calculates the splits based on event volume and size.
At a high level, the balanced Kafka reader works as follows:
- It pre-computes the mean event size in each topic and saves it in a CSV file.
- When the Spark streaming job instantiates the balanced Kafka reader, it passes an additional parameter numberOfSplits to specify the desired parallelism.
- For each Kafka partition, it computes the offset range to read (from current offset to latest offset) and applies the maxRatePerPartition constraint if set.
- It uses the balanced partitioning algorithm (described in the next section) to assign offset range subsets to splits evenly.
- Each Spark task reads from Kafka one or more offset ranges according to the split.
Below is a simple example with 2 Kafka topics and 3 splits. Events in topic A have a higher QPS but smaller size than topic B. The balanced Kafka reader would group subsets of these events together so that each split reads 1/3 of the data from Kafka. One split (split 2) would include 4 events from topic A and 1 event from topic B so that the total size is 8kb for each split.
Note that Step 1 may be improved in the future by computing the mean event size dynamically so new topics and topics with frequent changes in event size are better accounted for.
Balanced Partitioning Algorithm
The problem of assigning offset ranges to splits evenly is very similar to the NP-hard bin packing problem. Sophisticated algorithms to optimal solutions and fast algorithms to non-optimal solutions do exist with non-linear computational complexity. However, they cannot be used because our problem is somewhat different in that 1) the number of splits (or bins) is fixed; 2) an offset range (or item) can be split into smaller pieces.
Instead of adapting a complex existing algorithm, we developed a simple yet effective algorithm that is illustrated below.
- Compute the ideal weight-per-split according to formula above. For new event types that are not in the pre-computed list, use the average size of all event types instead.
- Starting from split 0. For each offset range
- Assign it to the current split if the total weight is less than weight-per-split
- If it doesn’t fit, break it apart and assign the subset of the offset range that fits
- If the current split is more than weight-per-split, move to the next split
This algorithm is super fast with O(number of splits). It just goes through the splits and Kafka partitions once sequentially. The result is that weights for most splits are extremely balanced except the last split that may have much less weight (which is fine because we are wasting resources of at most one task). In one test, the estimated weight-per-split is 489,541,767 with 20k splits. Weights for the smallest and largest splits are 278,068,116 and 489,725,277, respectively. The second smallest split has a weight of 489,541,772. Excluding the smallest split, the difference between the second smallest and the largest splits is 183,505 (only 0.04% of the largest weight).
The balanced partitioning algorithm performed well in both test and production. The variance of Spark task running time (as shown in the graph below) is much more evenly distributed than the original Spark Kafka reader. Most of the tasks finished within 2 minutes. A small portion of them took 2 to 3 minutes. Comparing to the wide ranges of event QPS and size, the small variance in task running time demonstrates the incredible effectiveness of the balanced partitioning algorithm. By taking event size and volume into account, it ensures the ingestion workload is evenly distributed across executors.
Improvements in Upstream and Downstream Systems
The balanced Kafka reader is a crucial piece of scaling the streaming ingestion of logging events. It is also important to make sure there is no other bottleneck in the upstream and downstream systems. In this case, we improved Kafka and HBase to enhance their throughput and reliability. For Kafka, the brokers were migrated to VPC which has 4X throughput. A streaming job is set up to monitor the QPS per Kafka partition so that when event volumes increase more partitions can be added in a timely manner. For the downstream HBase, the number of regions for the HBase table was increased from 200 to 1000 so bulk-loading the events to HBase can have higher parallelism (which is determined by number of regions).
For the Spark streaming job, speculative execution was enabled to better handle reliability issues in the underlying infrastructure. For example, one Spark task could be stuck due to reading from a bad data node with faulty disks. With speculative execution, the job is much less likely to be affected by those kinds of issues.
Thanks to the balanced Kafka reader, Spark applications consuming from Kafka is now horizontally scalable with arbitrary parallelism. The balanced partitioning algorithm is simple and has proven to be extremely effective. Because of these improvements, the Spark streaming job for ingesting logging events can handle an order of magnitude more events than the previous one. Stability of the system has improved so much that we have not seen any significant lag since the changes were deployed.
For future event traffic growth and spikes, the Spark streaming job for logging event ingestion will be able to handle them smoothly and efficiently. There is no more worry about skew in the events. If the job happens to be lagging due to issues of underlying infrastructure, it will be able to catch up rapidly.
The problems we solved here is not uncommon in large-scale Spark applications and data applications in general. It is important to carefully understand the data itself and how it is processed in each step, which could reveal potential bottlenecks, skew in the data, and opportunities for optimization. For example, Spark provides a nice UI showing the DAG for each job. From that, one can understand how a job is executed and whether it may be tuned for better performance via caching, repartition and etc.
Scaling streaming ingestion of logging events involves many upstream and downstream systems. The project was a collective effort of four teams across Airbnb Data Platform and Production Infrastructure. It would not be possible without the immense contributions from Cong Zhu, Pala Muthiah, Jinyang Li, Ronnie Zhu and Gabe Lyons. We are grateful for Xu Zhang’s enthusiastic help with Kafka. We would like to thank Guang Yang, Jonathan Parks, Gurer Kiratli, Xinyao Hu and Aaron Siegel for their unparalleled support on this effort.
We are indebted to Gurer Kiratli and Xiaohan Zeng for their help in proofreading this blog post.
Airbnb’s Data Platform team is always looking for good engineers with relevant skills! If you are enthusiastic about building out data infrastructure like this one and interested in joining the team, please check out our open positions and send your application!