Learnings about AWS Elastic Map Reduce and Spark
tl;dr
- leverage S3 with EMRFS,
- save yourself money, hunt on the spot market,
- optimal files size: 128MB is the sweet spot,
- bzip2 and lzo for smart compression; gzip if you don't care,
- for the optimal cluster size trial and error is the way to go,
- monitoring or the art to make sure you get what you paid for.
This last year, i.e 2017 — I have extensively worked with the cluster computing framework Spark using AWS EMR clusters. My personal experience was the discovery of a new way of data engineering; one that relies on transient workload, immutable data storage, finer control of cost, and maybe more importantly Separation of Compute and Storage; … I am getting ahead of myself already, let's dive in!
EMRFS, aka the EMR File System
If you've already used Spark, you know that it is built on top of Hadoop technology; you might also heard about HDFS, the goto file storage for Hadoop. HDFS provides the optimal performance for Hadoop and Spark, however if you use AWS EMR then you should definitely consider the Elastic Map Reduce File System i.e. EMRFS.
EMRFS is an implementation of HDFS which allows to store and retrieve data on S3 — the object based storage from AWS. In terms of performance it's hard to beat HDFS; using EMRFS comes with a slight penalty especially for the initial data retrieval and the final storage; there is no overhead for the intermediate map / reduce steps though.
One might then ask what are the benefits associated with EMRFS ? The most important one is the ability to permanently save the output of a job to S3. That means you can run a cluster for a specific task, unload its output to S3 and finally shut the cluster down. You will be then using transient EMR clusters; by the way since last October the EC2 instances are billed to the second, therefore the transient clusters are even more relevant.
In following sections I might use S3 and EMRFS interchangeably.
Going serverless
Yeah, I am totally comfortable with the buzz word :-) Well ok, what I really meant is a data paradigm called Separation of Compute and Storage; The concept is simple, yet its full coverage deserves several posts. In short, let's assume the situation where compute and storage are bound together; a very common one, for example a Redshift cluster, or bare metal Hadoop on top of HDFS, etc. Then you are in the situation where you always pay for storage but also for computing power — and that whether you are running computations or not.
The idea behind the Separation of Compute and Storage approach is that data sits immutable while compute resources are spawned on an on-demand basis, and are shut down once the task is complete. Which brings us to the major benefit of EMRFS; the ability to run analytic workload on demand on data stored in S3.
S3 plays really well with EMR, but not only; it works hand in hand with other AWS products such as Athena and Spectrum. The title of this section is actually very relevant, as S3, in combination with columnar file format (Parquet, ORC,…) are bringing serveless paradigm to data engineering, classic data warehousing, business intelligence and analytics in general.
To close this section on EMRFS, let me share some code showing how to interact with EMRFS using PySpark,
Spot instances, EMR nodes type and roles
Usually workloads related to analytics are not business critical; they can be designed so that it is enough to restart them after failures —such property is called idempotence. Being able to survive occasional failure opens opportunity in terms of cost saving, because then you can leverage the spot market and save up to 80% on cost. On the other hand, if your workloads are critical, you can still partially rely on the spot market; you just need to be clear about the different EMR nodes type and the risk associated to lose any of them.
An EMR cluster is a collection of Elastic Compute Cloud (EC2) instances. Each instance in the cluster is called a node and can have 1 of 3 roles:
Master node (exactly 1) the master node manages the cluster, coordinates the distribution of the map/reduce executable and subset of data to the core and task instances. If your workload is critical spot instances are no-go here.
Core node (0 or many) core nodes run tasks and if needed stores data locally using HDFS; keep that point in mind when you size your cluster, also choosing spot instances here comes with the risk of data loss — though in practice the job is terminated.
Task node (0 or many) the perfect spot instance candidate. This node only runs tasks and are used to speed up workloads. When a task node dies there is no data loss; the master node will simply notice and re-assign the job to a different node (task or core).
Not too small the files please!
Very small files can impact significantly the workload time on EMR. Without going into details: the load of each file requires EMR to spawn a JVM process, and thus incurs CPU time overhead; and since you have small files that means you have LOT of them — right? otherwise you won't be using EMR cluster, huh?. Also from EMRFS perspective, the more files you have the more network calls are performed. In short, small files are a bad idea.
But then what do I mean by "small" ? And what size is optimal ? The magic number here is 128MB — well ok, it's not really magic: 128MB is the Hadoop block size setup for EMR and it is the sweet spot you should aim for; you don't want be order of magnitude below that number.
If you have no control on the file size, tool like S3DistCp
can help; it basically allows you to recombine small files into bigger one using HDFS — have a look at this post.
When compression matters
In the last section we looked into the case of many small files, now let's talk about the case of few but big files. In that case you need to pay attention to the compression of the files; or more specifically to a feature of compression algorithms called Splittability. Quite self-explanatory: a file compressed with a splittable algorithm can be be splitted and recombined without any decompression step. When EMR process such a file, if it is big enough, the file will be split in many partitions and each one assigned to a mapper task; all nodes are working in parallel, you get what you pay for, nice!
Now let's look at the extreme case of one big file, compressed using a non-splittable algorithm. In that case the whole file is assigned to a single mapper task, meaning roughly one core on one node. Consequently the workload will be overall slower, moreover only one node is working — out of the full cluster you paid for; not good! Go splittable as much as you can.
Find in the table below if your favorite compression algorithm is splittable:
(*) We ran some tests with LZO compression, and found that it is actually not splittable — at least on our dataset. Apparently the solution is to use Twitter’s Hadoop-LZO in combination with a hack in Spark; we did not test this approach though.
Finally, note that the S3DistCp
tool, mentioned earlier, can be used to change file compression (cf. this AWS blog )
How to choose the right cluster size ?
Well, it depends! Don't you love that answer ? If you know a systematic approach please do reach out, thanks! I know only one way: trial and error. The idea is to run several tests on your own datasets trying to optimize both for costs and compute time, while monitoring at the same time the cluster.
The variables you should consider for this optimization problem are:
- EC2 instance type
- number of core nodes
- hadoop/yarn configurations (cf. AWS doc)
For monitoring I recommend enabling the Ganglia web interface when launching the cluster. Monitoring is key, that's how we found out about the problem of idle nodes caused by compression files. If you use EMRFS, it is also worth paying attention to network throughput. Generally you don't want to bottleneck in memory, cpu, disk I/O in order to extract maximum value from your cluster.
Regarding the nodes type, usually the general purpose EC2 instance m4 family is a good starting point (e.g. m4.xlarge).
For the YARN configurations we were able to cut the workload time by 50% simply by switching off this configuration: spark.dynamicAllocation.enabled
. It basically allowed the reuse of Spark executors in a multi-steps workload, more in the documentation.
If you are on AWS platform it's really easy to spawn a small cluster and test your assumptions; don't hesitate to run thoughtful experiment, the payoff can be huge.
Wrapping up!
This post became eventually way longer than originally expected; there is much to be said and still a lot to learn. Anyway, I do hope you find something useful here; and in case you have any feedback I would love to learn from you!