Large scale Batch Processing using AWS EMR on EC2

Shunmuga Sundar
Epsilon Engineering Blog
8 min readSep 26, 2023

--

Introduction

In today’s data-driven world, the ability to process vast amounts of information efficiently and effectively is paramount. Across industries, large scale systems are often used to process Big Data where the data contains greater variety, arriving in increasing volumes and with higher velocity. There are several challenges associated with big data and large-scale systems, including data storage, processing and security. Data processing plays a crucial role in extracting valuable insights, making informed decisions, and driving innovation.

Batch processing is one of the most used methods in large scale Big Data platforms. It is an incredibly cost-effective way to process huge amounts of data in a short period. It is preferred over real-time processing when accuracy is more important than speed. Batch processing jobs are commonly written using Apache Spark and run using centralized Amazon EMR clusters. Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Spark and Apache Hadoop, on AWS to process and analyze vast amounts of data. It lets you transform and move large amounts of data into and out of other AWS data stores and databases such as S3, DynamoDB, Redshift, etc.

The emergence of cross geographical teams and emphasis on modularity has led us to build batch processing jobs that are smaller, modular and serve specific purposes. At the same time, using centralized EMR clusters is a cost effective and easily maintainable way of running the batch jobs. Like most great things, batch processing has its own Achilles’ heel too. In this article, we will discuss more about the challenges and approaches taken to overcome those.

System overview

Let me showcase a large-scale batch processing system which comprises of multiple modular batch jobs written in Apache Spark and run using centralized EMR clusters (EMR on EC2 configuration). File Ingestion workflow (Figure 1: File Ingestion Workflow) is responsible for taking data file as input, and then transform, process and load into database or data warehouse. And the good old workflow orchestration ensures the sequential and parallel nature of these job executions inside the file ingestion workflow.

Batch jobs seen in the file ingestion workflow are separately developed as a modular stage to integrate with the common EMR management layer (Figure 2: EMR Management Layer and Queuing). The EMR management layer slots the execution on AWS EMR clusters based on varying input file sizes and manages the EMR queuing. It also decides the cluster type and configuration based on the input file size as listed in the table (Figure 2: EMR Management Layer and Queuing). This is to choose the right cluster type in terms of processing time and cost effectiveness.

Figure 1: File Ingestion Workflow

When the job is submitted, the EMR management layer gets the necessary cluster configuration and by checking certain criteria like current step count, IP availability, instance availability etc. a new EMR cluster is created, or the current job is added as a step in the existing cluster.

Figure 2: EMR Management Layer and Queuing

As per our file ingestion workflow, for ingesting a single file, there are 22 batch jobs required to be executed and hence is comprised of 22 EMR invocations in total. The EMR management layer decides whether these jobs are part of same cluster or different cluster based on multiple parameters like arrival time, cluster limit, resource availability, etc.

Problem scenario and challenges

Let’s talk about the Achilles’ heel, a scenario in which more files arrive at the same time and is expected to be processed with high concurrency. We will take the example of 6 files arriving at the same time and each having a different file size (Figure 3: Problem Scenario). The EMR management layer creates two clusters initially based on the file size-based configuration (Figure 2: EMR Management Layer and Queuing). Additional clusters are created as and when step limits are reached automatically with increased number of requests.

Let’s talk about the three important parameters which determines the overall execution time.

1. Queue Waiting Time:

In the first cluster shown in the diagram, the 20th job in the queue must wait for all 19 jobs to get over, though the actual execution time for one job is just 2 minutes. That is how the waiting time in the queue becomes an important parameter in deciding the end-to-end performance.

Waiting time for the job at the 20th slot = 19 * 2 + 2 = 40 minutes

Figure 3: Problem Scenario

We have already seen that each file ingestion requires 22 EMR invocations. With 6 files, this can get further challenging as there are around 130 EMR requests in a common timeframe. It’s important to note that we need 130 EMR requests for just 6 files. Too many EMR requests in the queue slows down the processing in the EMR queue and results in increased overall execution time for a file to be ingested. Overall execution time worsens when more concurrent files arrive, and the file ingestion SLA gets heavily affected. The challenge is how to minimize the waiting time and achieve the best overall execution time.

2. Cluster Start Time:

Since we are using EMR on EC2, there is a cold start period of 5 to 8 minutes of every cluster creation and adding the step into it. When the jobs are in queue, the cluster is kept warm. But keeping the cluster warm even when no jobs are running is not recommended as it will have cost implications. The challenge is how to minimize the number of cluster starts and achieve the best overall execution time.

3. Infrastructure Cost:

Minimizing or controlling the infrastructure cost is a very important parameter for any system. There are two sub-parts to it.

  • Higher number of clusters running in parallel means higher cost. So, limiting the maximum number of clusters running in parallel is very important. Since EMR is configured on EC2, we can set the EC2 dedicated hosts limit (R5 instance quota limit is set as 2 in our example) and control the cluster limit (Reference: Configuration (amazonaws.us)). The challenge here is how we can optimize the overall execution time within the maximum cluster limit.
  • For a file size of 500KB, if r5.1xlarge cluster type is good enough, running it with r5.12xlarge would result in extra cost like using a sledgehammer to crack a nut. The best cluster type utilization should be optimized to control the cost.
Figure 4: Impacting Parameters

The challenge is to find that sweet spot — right balance between infrastructure cost, cluster start time and queue waiting time — to ensure the optimized overall execution time.

We can think of different methodologies to address this.

  • Method 1: Infrastructure fine tuning
  • Method 2: Optimizing EMR queuing algorithm
  • Method 3: Design refactoring

Method 1: Infrastructure fine tuning

With EMR tuning in various parameters including step limits, file ranges, cluster type and instance counts, we were able to achieve reasonable overall execution time for the scenario explained above. However, as we delved deeper, the number of EMR requests and max quota limit would always hit the wall if the number of files and volumes increased. With increased number of smaller and more concurrent files arriving in the system, it is challenging to keep up with the performance with respect to end-to-end processing time.

Method 2: Optimizing EMR queuing algorithm

Here we can think of multiple options depending on the business needs.

  • A file size-based queuing (which is described above) which ensures the best cluster type utilization but has more queue waiting time and moderate number of cluster starts.
  • A round robin algorithm of distributing all the jobs equally to all available clusters. This might have a lower queue waiting time and moderate to less cluster starts. But would fail with best cluster type utilization and cost.
  • A dynamic load balancing algorithm which identifies the best configuration such as step limit, cluster type and file size range based on the queue size and workload. This should have a moderate queue waiting time and moderate to less cluster starts. It should also do decent with best cluster type utilization and cost.

Method 3: Design Refactoring

The other option is structurally refactoring the design of the data flows that is scalable. The main change is to combine EMR related functionality together (using Spark UDFs — User Defined functions) so that we have minimum number of EMR invocations for each file. Key points to note here are, though we combine the functionalities into a single EMR job, we keep the flows modular enough and ensure orchestration internally.

Figure 5: Optimized File Ingestion workflow

With the design refactoring approach, we could observe around 60% to 80% performance improvement in various scenarios by reducing EMR invocation requests from 22 to 4 per input file for the file ingestion workflow. With this approach, we can also achieve the smaller number of File I/O operations as we do not need to write or read at every processing/transformation step. Since the number of EMR requests are less, will also help to decrease queue waiting time and number of cluster starts, hence achieving better overall execution time without adding any extra infrastructure cost.

Conclusion

After weighing all options, we can conclude that the design refactoring approach provided greater performance improvement (60% to 80%) with the existing EMR infrastructure in cases of high number of small (~500KB) to moderate size (~10 GB) files dropped in a concurrent manner. For example, the overall execution time to ingest a file with 10 million records was reduced to just 45 minutes compared to 3 hours 36 minutes with the non-optimized flow.

Key Takeaways

  • The results of design refactoring approach prove that grouping of logically related services into a larger building block provides better infrastructure utilization, cost, and overall execution time in the above scenario.
  • File I/O operation is one of the major culprits in terms of execution time with Spark and EMR combination. Reducing the File I/O operations to as minimum as possible will provide great performance numbers.
  • According to business needs, methods like optimizing EMR queuing algorithm and EMR fine tuning can be used in addition to design refactoring.
  • Due to cost implications, infrastructure boosting should be a last resort in the optimization journey.
  • Though the blog addresses the challenges in using EMR on EC2, majority of the concepts like optimized queuing, cluster utilization challenges can be applied with other Distributed Processing configuration types as well.

--

--