Tuning Spark Applications to Efficiently Utilize Dataproc Cluster
Have you recently migrated your Spark application from the on-prem Yarn cluster to Dataproc? Then this blog post might help you to tune your Spark applications to efficiently utilize the GCP Dataproc and save cost.
Recently our team moved several big data workloads to the Google Dataproc cluster as it offers an ephemeral job scoped cluster. This helps to ensure every job gets a dedicated cluster and slowness in one job does not impact other jobs.
There is one difference to be cautious about on the on-prem Yarn cluster vs. the Dataproc Yarn cluster.
In the case of an on-prem Yarn cluster, it is a shared cluster and multiple jobs run parallel by different teams. So whatever resources that you have requested for your job, you would get that allocated and other jobs would be using the rest of the resources of the cluster. So the resources of the cluster are maximum-utilized based on the number of jobs running in parallel.
But in the case of the Dataproc cluster, we often create an ephemeral cluster for single job execution. This means we have to ensure that our job uses all available resources of the cluster. If not, then there are no other jobs running in the cluster and the resources are not fully utilized, but you get billed for the total compute engine used to create the Dataproc cluster.
Let’s understand the issue better by taking an example. Consider a 41 node Dataproc cluster that we are creating with the following config:
Dataproc Cluster
|
|--- 1 Master Node (n1-standard-8) = 1 * (8vCpus and 30GB Mem)
|
|--- 40 Worker Nodes (n1-standard-16) = 40 * (16vCpus and 60GB Mem)
Total vCPUs and Memory (Workers)40 * 16 vCpus = 640 vCpus
40 * 60GB Memory = 2400GB Memory
Yarn cluster would be running several Yarn-specific daemons in worker nodes in the background. So we need to make sure that we leave aside enough cores and memory for these daemons and OS to run smoothly.
Total Available vCPUs and Memory on Worker Nodes40 * 15vCpus = 600 vCpus (leaving 1vCPU per Worker node for deamons & OS)40 * 48GB Memory = 1920GB Memory (leaving 12GB per Worker node for deamons & OS)
Each of the worker nodes will only get 48GB of memory and 15 vCpus for data processing/computing. This is because by default Dataproc sets yarn.nodemanager.resource.memory-mb
as 80% of the compute engine machine’s memory (80% of 60GB = 48GB ~ 49152MB).
Based on the compute engine machine flavor that you are using for the cluster creation, default allocation varies between 75% — 80%. The current configured value of yarn.nodemanager.resource.memory-mb
for your cluster can be validated in the Yarn RM UI by navigating to Tools → Configuration.
The property yarn.nodemanager.resource.memory-mb
can also be overridden with your custom value while starting the Dataproc cluster.
Configuring Spark Executors
Now, we need to divide the available resources of a worker node into executors. One thing to be careful about here is that we should not underutilize or overutilize the available resources, which would result in the wrong configuration and the end result would be underutilization of the overall cluster while running the jobs.
If we plan to have three Executors per worker node, then each executor will have the following configurations:
This configuration for Spark Executor is ideal for our case. However, we should understand that memory requested to yarn per executor = spark.exeutor.memory + spark.executor.memoryOverhead.
So we have to further distribute 16GB against executor.memory
and executor.memoryOverhead.
After all, for the above calculations we have the below config for our Spark job (the memory distribution can vary based on your job).
spark.executor.cores=5
spark.executor.memory=14g
spark.executor.memoryOverhead=2gspark.driver.core=5
spark.driver.memory=14g
spark.driver.memoryOverhead=2g
We have 40 worker nodes and each worker node has 3 executors and each executor has 5 cores, i.e. in total 40 * 3 * 5 = 600 parallel cores.
We can observe that the cluster is capable of executing 595 parallel tasks (600–5 driver cores) and the cluster is 99.2% utilizing the resources.
Cost of MisConfigurations
With the above calculations, the properties are now set for our Spark job. What if we do not need to do all of these calculations and configure these properties? Let us start a Dataproc cluster with a different configuration (changed spark.executor.memory from 14g to 15g).
spark.executor.cores=5
spark.executor.memory=15g
spark.executor.memoryOverhead=2g
Below is the screenshot of the job running with this configuration:
Now, despite the fact that we used the same number of compute instances for the Dataproc cluster creation, we do not get the same compute capacity in Dataproc as we did previously. Parallel running tasks are only 400 and the cluster is only utilizing 70.9% of the resources.
This is due to the default resource calculator of Dataproc, which is org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
which only uses memory information for allocating containers and not CPU scheduling. And due to this, in every worker node, only two executors are getting initialized, as the third executor would not have the required memory. This means we are wasting 5 vCpu and 14GB of memory in each worker node.
Total Un-Used Resources 40 * 5 vCpu = Total 200 vCpu is un-used but Billed
40 * 14GB memory = Total 560GB of memory is un-used but Billed
Conclusion
In the on-prem Yarn cluster, we don’t have to know much about Yarn and we can make changes to our Spark job’s executor properties (especially cores and memory properties) anytime to re-configure our job based on our need. And there won’t be much impact on the Yarn cluster as other jobs executors are also placed in the same worker nodes.
But in Dataproc, every time we make any changes to executor properties, we have to re-assess to ensure we fully utilize the available resource as the cluster is ephemeral and intended to run only our job.
When we use Dataproc to run Spark jobs we must have basic knowledge about Yarn. Only then we can fully utilize and truly say the pricing of our cluster is based on pay-as-you-use.