Building cheaper and more performant EMR pipelines
--
A story about 60% EMR saving costs with a 500% processing time improvement
Thank you to Robin Mizreh, who also made this article possible.
The goal of this article is not to explain how EMR (Elastic Map Reduce) works nor to say what approaches or frameworks are better to take now and then. We wanted to share the knowledge we acquired by using EMR and Spark, including cluster sizing, spark configurations, and other tips and tuning that were useful to us at some point. Spoiler alert: we have applied these ideas in many of our data pipelines, as a result, we have reduced our EMR costs by 60%, improved processing time up to 500%, and achieved a more consistent datalake.
Most of data architectures nowadays have a batch component in the data process. Business Insights (BI) pipelines sometimes need to reprocess a good old batch of data to update some tables. Machine Learning (ML) pipelines may need a backfill of historical data to setup a new feature accordingly. Even more general use-case data pipelines that move data from a database A to a database B are often in need of a reliable batch data process. Because streaming is nice and good, but streaming alone very rarely cover a full use-case.
At Voodoo we do have these batch-like cases very often. We transform JSON data into Parquet data. We clean and prepare datasets in order to build ML features for games and users that other processes can exploit. Or, we simply daily copy and prepare data from a table in an applicative database to send it to our data-warehouse. All these use-cases are mainly set with batch processes based on EMR, which is AWS fully manage service with Apache Spark (but not only). Also, for those who wonder, our batch processes are scheduled using Apache Airflow, with the good and the bad of this choice, but this is another different topic.
EMR
We skip the basic “setup EMR in 2 clicks” to focus more on “how to set up EMR” in a more open configuration.
Our batch processes are scheduled, yes. But further than that, the creation of our EMR clusters are part of this scheduled process. We do not have a static EMR long-living running 24h/7. Instead, we create ephemeral clusters whenever we need and we destroy them whenever we are done. Why? This allows us to fine-tune the cluster settings and sizes to the needs of the application and the data ingested. Also, it is much cheaper, you only pay for what you need, exactly for the time you need. This is especially interesting in EMR since the service is billed per-second basis ;)
During this scheduled process we set up the configuration of EMR clusters before they run. The main blocks to check in this configuration are:
- ReleaseLabel
- Instances
- Applications
- Configurations
- Tags
The AWS documentation for EMR explains other settings and tuning that can be interesting as well
Release Label
This parameter marks the version of the EMR to deploy. Newer versions introduce:
- EMR managed service improvements. Starting with label
emr-5.24.0
major improvements concerning Apache Spark runtime have been added. Spark EMR is around 32 times faster than previous more vanilla releases, and all these improvements come with no extra cost. These changes are enabled by default by simply using the correct label, there is no other configuration or manipulation to perform. - Frameworks updates. Apache Hadoop 2.8.5 is available since
emr-5.19.0
, whereas Apache Spark 2.4.4 is available sinceemr-5.27.0
. This is also important to know to match versions with your application's dependencies. - New frameworks. For instance, from version
emr-5.28.0
Apache Hudi is available.
Pick the EMR version you need according to the frameworks you will use in your application. Preferably, use versions of emr-5.29.0
and above because they include Spark runtime optimisations and the bootstrap time of these EMR versions have also been reduced. This means, better performance and less runtime, what comes in handy because for billing.
Remember to assure the compatibility of your frameworks as well. It is always worth to take a look at this nice release version documentation
Instances
You really want to take care of this block to control the cost of your EMR.
Behind the scenes, this service is simply creating a well managed Hadoop ecosystem, hence a Hadoop architecture. These aspects are interesting to be analysed in a master-worker architecture. In here we can choose the type of hardware we want for our cluster:
- Instance type
- Instance count
- Market
- EBS storage
Master
The master will be in charge of distributing the tasks and having the big picture of your application runtime. It is not meant to do computing, but sometimes it happens to do so. However, if the master is down or has failed, the application is not working anymore. Because of that, we want our master to use general-purpose machines (for instance, m4.large
or m4.xlarge
). Below there is an example of master configuration:
{
'InstanceRole': 'MASTER',
'InstanceType': 'm4.xlarge',
'InstanceCount': 1,
'Market': 'ON_DEMAND',
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'SizeInGB': 400,
'VolumeType': 'gp2'
},
'VolumesPerInstance': 1
}
],
'EbsOptimized': True
}
}
Pay attention to the ON_DEMAND
market. On-demand instances are more expensive, but you probably do not want AWS to get this instance back while running your application, otherwise, your whole application will disappear with it. AWS is less and less often taking instances back (we will show this in a bit), hence choosing SPOT
market instances may not be an issue if you are willing to take the risk, especially in jobs with short runtimes. Also, there is no special need to have more than one replication of this instance because clusters are easy to reproduce in case of global failure.
Workers
The core instances in EMR are the workers. They will be in charge of actually executing operations over data, and hence memory and IO are interesting hardware specifications to care about. In fact, these workers are part of Hadoop Datanode components to host HDFS (Hadoop Distributed File System) which is the data storage layer. Below there is an example of a core (worker) configuration:
{
'InstanceRole': 'CORE',
'InstanceType': 'i3.xlarge',
'InstanceCount': '3',
'Market': 'SPOT',
'BidPrice': '2.00'
}
Notice there is no EBS in this configuration. At Voodoo, we use S3 object storage as our main data storage and HDFS as an intermediate temporary “fast” memory (very much like a massive data cache). This means that our Spark processes write results into HDFS, and we then once the final data is ready we dump it as a whole by using s3-dist-cp
(this improves a lot the process but do not worry, we will explain it later). Therefore, we use the i3
instance family because they have better IO specifications for these use cases. This HDFS memory-intensive usage and copy from HDFS to S3 matches with the fact that all our EMR are ephemeral (data in HDFS will disappear with HDFS after termination). A little warning on the family choice and the storage: if other families are used and have EBS bound, or there is a big need in having EBS bound with the i3
, take into account that the performance in IO towards EBS will be directly bound by the EBS size.
The choice of the size of this instance and the number of instances to create is made looking back at the data. The main question we ask at Voodoo is: What is the size of data I am going to deal with? Hence if our batch process is light, for instance dealing with 30GB or 40GB, having 3 i3.xlarge
instances is more than enough. This is the strict minimum number of instances to assure replication and proper parallelisation. Under even lighter batches, the number of executors becomes smaller and processes are poorly parallelised. Also, are you sure to want to process small data batches using big data tools? Other frameworks or paradigms may be more adapted for those use-cases, for example, Python Pandas, Python Pandas Parallel, or Python Dask make it easier to use multiple cores.
To be more accurate, at Voodoo we have actually included a “calculator” in our scheduling process to get to know the kind of EMR cluster we need depending on the size of the data we will deal with. We compare this data according to a set of thresholds and simple rules. For instance, we do want all data to fit in RAM. And finally, this calculator gives us the more-less optimal instance type and the instance count. It might not be perfect, but it is good in most cases. Also, this calculator gives us more detailed information about how to setup up our spark configuration, but we will see that later. We got inspired by this famous spreadsheet
Finally and maybe the most important for the core configuration is the SPOT
market. Spot instances allow us to save up to 90% of EC2 infrastructure costs. Also, core instances are replicated, hence we can afford to lose one instance and wait for a new one to appear. Spark will automatically rebalance the workload. Notice as well, as mentioned before, that AWS instance re-claim is less and less frequent. This chart shows the evolution of spot instances reclaim in the past years.
In short, using spot instances and a proper instance type for your cluster can reduce very much the cost of your cluster runtime.
Applications
We focus on Spark applications, so just make sure to have Hadoop and Spark enabled in this case. But there are other applications which can be very interesting either for runtime or debugging.
Ganglia
Ganglia is particularly interesting to look at. My mother used to say “do not judge people by appearances”. This is also true when taking a look at the interface of Ganglia, especially if you are running your application for the first time and you need to debug. Ganglia might not be very beautiful or fancy, but it does one thing and it does it well: monitoring of scalable distributed systems.
Enable this application before running your EMR cluster and you will have access to Ganglia
This is what it looks like.
The square on the left is already very informative. In one glance we can see the average load distribution among all instances. Rule of thumb: you want reddish colors. Overloading a distributing system a tiny little bit is usually not a problem and you can avoid an extra instance.
Ganglia gives you the information of all the cluster, or of one single instance. It shows CPU, Memory, Network, and globally all metrics that you could possibly want to investigate related to performance and issues within your cluster.
Clearly useful to understand how is your cluster reacting to your application and data processing. Ganglia allows you to see very clearly and quickly if your application is well parallelised or not and makes it simple to identify data skew and balance between executors. Also, memory spikes and network spikes issued from reads or writes are very easy to spot. Shuffle operations will also create spikes in these beautiful charts. And you can always check if your application is more CPU demanding or RAM demanding in order to change the EMR Core configuration or the size of the cluster.
Spark
Application (EMR Step) configuration
We have talked about the “calculator” which helped us to size our EMR cluster. Another nice feature of this calculator is to help us to tune-up the spark configuration before-hand. The following configuration contains 2 steps: an application job step called my_job
and a data movement step called move_data_hdfs_to_s3
. The former is the application itself and writes data results into HDFS. The later moves this data from HDFS to S3.
[
{
"Name": "my_job",
"Type": "CUSTOM_JAR",
"ActionOnFailure": "CANCEL_AND_WAIT",
"Jar": "command-runner.jar",
"Properties": "",
"Args": [
"spark-submit",
"--class",
"io.voodoo.Application",
"--master",
"yarn",
"--deploy-mode",
"cluster",
"--conf",
"spark.driver.memoryOverhead=4g",
"--conf",
"spark.executor.memoryOverhead=4g",
"--conf",
"spark.driver.maxResultSize=4g",
"--conf",
"spark.default.parallelism=72",
"--conf",
"spark.executor.memory=27g",
"--conf",
"spark.executor.cores=5",
"--conf",
"spark.executor.instances=18",
"--conf",
"spark.driver.cores=2",
"--conf",
"spark.driver.memory=27g",
"s3://my_application.jar",
"--exec-date=2020-05-19"
]
},
{
"Name": "move_data_hdfs_to_s3",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"s3-dist-cp",
"--srcPattern=.*parquet",
"--src=hdfs:///my_app/my_folder",
"--dest=s3://my_destination_bucket"
]
}
}
]
The calculator takes into account these tips:
- Keep always at least 1 core for the system
- Optimise the core to use: between 3 and 5 per executor
- Optimise the ram to use
- Optimise the parallelism:
parallelism ~= num_executor_instances * (num_executor_cores - 1)
And, you may wonder: what is this s3-dist-cp
in the data movement step shown at the end of your application? Let's call it the cherry on top of the cake. We explain that in our next and last tip for EMR
S3-Dist-Cp (EMR Step) configuration
HDFS is a block storage system, however, S3 is object storage. There are many temporary operations that must occur when your Spark application tries to write directly to S3. In fact, if you do it, you will notice that there are temporary keys in your destination s3 bucket. This is not alarming, and somehow it is quite useful when you want to replay the very same job with the very same input and having the same destination (idempotency). But of course, it has its drawbacks: it is painfully slow.
Instead of making the application write to S3, we write to HDFS first. This is what we called, using HDFS as a “fast” memory cache. Now, how can I pass my data from HDFS to S3? Here is where the s3-dist-cp
comes to play.
The s3-dist-cp
is only another application running within your cluster, also called "step" in EMR. It allows us to perform data movement operations within S3, or from/to S3-HDFS. And it makes it really fast.
{
"Name": "move_data_hdfs_to_s3",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"s3-dist-cp",
"--srcPattern=.*parquet",
"--src=hdfs:///my_app/my_folder",
"--dest=s3://my_destination_bucket"
]
}
}
For instance, we have had jobs that were directly writing to S3 and having a total runtime (read, computing, and S3 writing) of around 20 minutes. This very same job with the same input and output had decreased the total runtime to 8 minutes (6 minutes reading and computing, and 2 minutes writing). And the only thing we had to do was to make Spark write to HDFS and to add a second step for data movement.
Indeed, the more data you need to handle the more suited this step is for you. It is difficult to estimate the order of magnitude in the gain of using this technique. We have estimated that it reduces the writing time of our jobs of around 80%. For instance, what would take 10–12 minutes making Spark to write into S3, it takes less than 2 minutes using this technique. And again this is very important because EMR is billed per second.
Conclusions
With this article, we wanted to share our experiences with the EMR service and the little ideas we had to optimise its usage not only for performance but also to control the bill at the end of the month.
- Scheduling + Ephemeral EMR: Adapting your cluster to your application is key to performance and cost. In this article, we have taken into account data volume ingested and data processing to setup the EMR cluster configurations in every job run.
- Simple and general tips: The hardware choices and EMR configurations may not apply to all applications and needs, but it can work with a vast majority of use-cases. This may have a great impact on most of the jobs you are running on a daily basis and help to improve the performance and cost of these jobs. Also, these tips can be used as a base to create other rules more suited to other use-cases.
- Play your self: you never know until you really try =]
More to come.
Links
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/images/emr-releases-5x.png
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html
https://aws.amazon.com/blogs/aws/now-available-i3-instances-for-demanding-io-intensive-applications/
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html