Expedia Group Technology — Data

Herding the Elephants

Managing Hadoop clusters for petabyte scale distributed data processing

Abhimanyu Gupta
Expedia Group Technology

--

A woman takes a picture of an elephant in the distance from a car
Photo by Wade Lambert on Unsplash

Team: Abhimanyu Gupta, Andreea Paduraru, Jay Green-Stevens, Patrick Duin, Jose Nuñez-Izu

Note: Please don’t herd any actual elephants!

We, the Expedia Group™️ Data Lake Team, are responsible for running and maintaining the Expedia Group’s data lake. The data lake has multiple components responsible for handling data federation, replication, access controls, metadata, orphaned data deletion and more.

One of the core components of the data platform is Cloverleaf. Cloverleaf is our internal, Hive-based, data reformatting and repartitioning tool. It reads data from source Hive table partitions, merges them with the existing partitions from a target Hive table, converts the merged data to Parquet format, and finally writes it back into the partition structure requested by the customer. Cloverleaf executes Hive jobs on a fleet of auto-scaling Amazon EMR clusters.

Cloverleaf runs more than 12,000 jobs every day, processing 3.5 PB data every month. Along with the data lake, Cloverleaf has seen rapid adoption over the past 4 years. We have scaled it from processing 10 tables back in November 2019 to now 1,300+ tables and growing. Suffice to say, managing this Cloverleaf data processing platform is a full-time job — many full-time jobs, in fact!

When Cloverleaf got its first few customers as a data platform tool, all the jobs used to run on a single EMR cluster with self-managed scaling rules. As Cloverleaf got more adoption, we started hitting scalability issues. Those were:

  • YARN resource manager instability because of heavy load.
  • YARN resource manager instability used to cause running jobs to become orphans.
  • Resource contention on the cluster.
  • Limitations with how many jobs could run in parallel on a single large EMR cluster. For instance, only 20 Java applications could run in parallel on the master node using the now deprecated Data pipeline/Taskrunner architecture.
  • Repeated HDFS corruption.
  • Large jobs used to take forever to finish because of recurrent container failures.
  • And, in the worst case, the cluster gets terminated because it was so overloaded that it could not communicate with the EMR service.

We worked through these issues by making several optimisations on the Hadoop/EMR side as well as on the application architecture side. Through this blog post, my intention is to share the learnings and best practices about Hadoop fleet management that we have gathered over these past few years.

Load pattern

For optimal results from your tuning exercise, it is important to first identify the pattern of your job workload on the Hadoop clusters. This then drives the scaling rules and the Hadoop/Hive configurations. Cloverleaf has an hourly burst load pattern. That means new partitions land in the source tables hourly, and 300+ jobs are launched to process them. These jobs generally need to finish within the hour so that the processing for the next hourly partition isn’t delayed.

Service level agreement (SLA)

Cloverleaf jobs run on shared EMR clusters. Data volumes for these jobs can fluctuate depending on the traffic, or when restatements are performed, or when backfills are done. Therefore, it is not possible to promise a fixed processing time for all jobs. That’s why Cloverleaf’s SLA is based on data volume as follows:

A table defining the maximum expected processing time, in minutes, for a job based on its size, in GB, with additional comments stating the percentage of jobs falling into each category.

Job distribution

Because of the burst load pattern, the small and the large jobs all need to run at the same time. When we started, all jobs used to run on a single EMR cluster. We observed that, because of the way the default YARN scheduling algorithm works, the large jobs will take up almost all the resources on the cluster while the small jobs were waiting for a long time before they could even start. Also, there are certain high-priority large tables that need data to be processed as soon as possible.

To handle this load so that the large jobs don’t interfere with the small jobs and Cloverleaf meets its SLA, we divided the jobs into four categories.

1. Small jobs — producing less than 100 GB of data per run. More than 90% of jobs fall in this category.

2. Small jobs with high priority.

3. Large jobs — producing more than 100 GB of data per run.

4. Large jobs with high priority. Less than 1% of jobs fall in this category.

We refer to these categories as domains. Each domain is an independent fleet of EMR clusters that can be scaled out or in without impacting the other domains. It also helps us limit the impact when an issue happens in the production environment.

Cluster scaling

Core nodes

In an EMR cluster, you can scale either the worker nodes or the core nodes. We are using S3 for storing the final output. Therefore, we only need cluster storage for intermediate data. We observed that relying on core node scaleout for storing intermediate data always ended up in job failures for medium and large jobs. Errors such as the following were common:

java.lang.RuntimeException: Error caching map.xml: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/hive/hadoop/f1650a56-a7a7-40dd-86e1-6e8ffcb86212/hive_2020-02-21_08-10-05_830_6655714004322293283-1/-mr-10003/43bc5ddc-08b7-4fce-8656-f5d519fbc65d/map.xml could only be replicated to 0 nodes instead of minReplication (=1). There are 30 datanode(s) running and no node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1735)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:265)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2575)

The job produces data too quickly for the Core nodes to be added even if we set to scale out on HDFS utilisation of 50%. Too much data would also go to one of the nodes, and the nodes would get decommissioned, causing further job delays. Therefore, we have fixed the number of core nodes for different domains. We aim for our HDFS utilisation to be under 50%, even at peak load.

Storage for intermediate data

We started with using SSD EBS volumes for storage but soon realized that instance storage gave us much better performance and therefore, we shifted to that.

HDFS replication factor

All the data in our cluster is intermediate (moved to s3 when the job finishes). We played around with the `dfs.replication` factor but realised that setting it to a higher number just led to HDFS getting full faster and we don’t really need the backup of intermediate data. So, we have set the `dfs.replication` factor to 1 to reduce the replication workload for the core nodes.

HDFS cleanup

We have observed that when there are multiple jobs running on the cluster and it is constantly scaling up and down, Hive will leave lots of orphaned temporary data on HDFS. If not cleaned up, this will cause your core nodes to run out of disk space, impacting all jobs running on the cluster. We wrote a little cron job to clean up this data after a day, as we don’t expect any jobs to run for that long.

hdfs dfs -rm -r -skipTrash /tmp/hive/cloverleaf/${file_path}

Scaling rules

Initially, Cloverleaf EMR clusters had custom scaling rules which were primarily based on the container pending ratio. Because of the burst load pattern, we needed the EMR clusters to scale out faster at the beginning of the hour and then scale back down quickly once the load subsided. With custom scaling rules, we observed that the scale out of the cluster to the maximum number of task nodes took at least 30 minutes. This was too slow for us because the jobs were just queued up in the resource manager till enough resources became available. Since October/November 2021, the EMR clusters have been running with EMR-managed scaling. It has improved the Cloverleaf job execution times because of faster scaleouts. The downside is the increased cost. We have observed that the managed scaling algorithm takes into account the maximum number of nodes that a cluster can scale out to as a factor in its calculations. This means that if you set the max nodes to a higher value to deal with the load spikes at certain times of the day or after an outage, the cluster will be scaling out more than necessary to handle the usual load as well. This leads to higher costs. We have been engaged with AWS EMR team to understand this behaviour better.

February 2023 update: We observed the above behaviour while we were on EMR version 5.33.1. We migrated to EMR version 5.36.0 at the beginning of 2023 and since then the cluster scaling has improved and is along the expected lines.

Cluster tuning

For increasing the throughput on the EMR clusters and improving the stability of the resource manager, we have tuned a number of configurations for YARN, namenode and datanodes. I will talk about some of them below:

Heap memory

To handle intensive job load, the heapsize for YARN resource manager, node manager and proxyserver should be increased. We have set it as following:

YARN_RESOURCEMANAGER_HEAPSIZE = 24 GB
YARN_NODEMANAGER_HEAPSIZE = 8 GB
YARN_PROXYSERVER_HEAPSIZE = 8 GB
HADOOP_HEAPSIZE = 6 GB (in hive-env.sh)

yarn.scheduler.capacity.maximum-am-resource-percent

It directly impacts the number of concurrent active applications that can run on the cluster. We increased it to 0.4 (default 0.1). That means up to 40% resources on the cluster can be used to run application masters.

yarn.resourcemanager.recovery.enabled

Even after setting higher heap memory and putting larger instance types for the master node on the EMR cluster, there will be situations where the resource manager (RM) will just get overwhelmed, and it is restarted by the EMR health check. When that happens, all the running jobs lose their state, causing them to get stuck. By setting this configuration to true, the resource manager periodically saves its state on HDFS and can restore it after a restart. This has substantially improved the resiliency of Cloverleaf jobs and reduced our platform recovery time after RM failures. When RM recovery is enabled, it is highly recommended to set the following properties to improve RM performance:

yarn.resourcemanager.fs.state-store.num-retries=2
yarn.resourcemanager.fail-fast=true

yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch

Cloverleaf EMR clusters are long-running. After a while, we saw that RM was running into a deadlock. We found that a number of threads were waiting to communicate with the timeline server. Enabling this option, therefore, improves RM system metrics publisher’s performance by pushing events to the timeline server in batch.

HDFS site configurations

dfs.namenode.handler.count=2048
dfs.namenode.replication.work.multiplier.per.iteration=128
dfs.datanode.balance.bandwidthPerSec=5368709120
dfs.datanode.max.transfer.threads=8192
dfs.datanode.handler.count=2048

Hive query tuning

Just like the cluster, jobs should also be tuned to improve throughput on the clusters. Cloverleaf primarily uses tez and mr as execution engines. While tez is the default execution engine, mr is used only as a last resort once all configuration tweaks have been exhausted. We have found many Hive settings over the years to improve job performance on shared EMR clusters. I will briefly talk about some of them below.

Compress intermediate data

Compressing intermediate data reduces the data volume on the cluster by around 50% and makes the jobs around 30–40% faster.

hive.exec.compress.intermediate=true

Delay start of reducers

By default, YARN will start the reducers once 5% of the mappers have finished. In most of the large jobs, it takes a while before the rest of the mappers finish but reducers are already started, and they keep waiting. These containers are better utilised for other smaller jobs. Therefore, we have increased it to 0.99 and see improved cluster utilisation.

mapreduce.job.reduce.slowstart.completedmaps=0.99

Retrying S3 requests

While running thousands of jobs, you might sometimes get throttling errors:

AmazonS3Exception: Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow Down;

To handle this, we increased the S3 retries to 20 (from default 15).

fs.s3.maxRetries=20

Container size

Container size has a direct impact on how many jobs can fit onto an EMR cluster. The smaller the container size, the more jobs can be fitted. Cloverleaf jobs have a default container size of 4 GB, which works for most small- and medium-sized jobs.

For most of the large jobs, this has been overridden to 8 GB or even 12 GB.

Default Hive job settings

Following are the default settings that we have set after our tuning exercise. Please note that we override these settings for different datasets based on the data volumes when necessary.

fs.s3.maxRetries: '20'
hive.execution.engine: tez
mapreduce.map.java.opts: '-Xmx3687m'
mapreduce.map.memory.mb: '4608'
mapreduce.reduce.java.opts: '-Xmx7374m -XX:NewRatio=4'
mapreduce.reduce.memory.mb: '9216'
tez.am.resource.memory.mb: '2048'
hive.tez.java.opts: '-Xmx3278m -XX:NewRatio=4'
hive.tez.container.size: '4096'
mapreduce.job.reduce.slowstart.completedmaps: '0.99'
hive.optimize.sort.dynamic.partition: false
hive.blobstore.optimizations.enabled: false
hive.map.aggr: false
hive.exec.compress.intermediate: true

Conclusion

Tuning a data processing platform is tricky, especially when it also involves a Hadoop cluster. As Cloverleaf adoption increased and our scale of operation increased, the issues that we faced grew in complexity. Optimizing the cluster performance and jobs should be treated as an ongoing process rather than a one-time effort. That’s why some of these configurations are quite fluid and have changed many times in nearly four years as our understanding of Hadoop / Hive got better.

With these optimisations over the years, the platform currently supports:

  1. 10,000+ jobs per day.
  2. Processes data volume of 3.5 Petabyte per month.
  3. 1300+ tables are onboarded.
  4. A fleet of 8 EMR clusters with a combined max capacity of 2000+ nodes.
  5. Recently, upgrading the EMR version to 5.36.0has provided us with much better scaling performance and consequently ~30% cost savings.

I want to especially thank my teammate, Patrick Duin, who has been my partner on this voyage. I hope this post will provide some useful tips on how to scale a data processing platform. Please let us know in the comments if you tried some new optimisation ideas. We would love to hear from you!

--

--