Demystifying Dataproc spark job executions
Dataproc and Spark are most sought out technology for data integration use-cases
One of the most common mode of job deployment strategy in dataproc is to leverage ephemeral cluster executions. This blog focusses on the issue of optimizing the job concurrency and allowing dataproc to process spark jobs more faster and efficiently.
Let’s try to understand the use-case at hand, we have 10 pyspark jobs (all similar jobs) to execute on a batch basis on ephemeral dataproc cluster.
For this we have used composer as orchestrating tool for creation of the ephemeral cluster, followed by 10 pyspark job submissions using dataproc job submit operator and finally deleting the cluster once the execution is done.
The composer DAG looks as below.
The spark cluster configuration provided for this DAG is using basic worker configuration of 2 n1-standard-4 workers (4 cores and 15 Gb of memory per machine) and autoscaling policy as below
Also on the DAG level, we have defined “max_active_tasks” as 4 to launch only 4 dataproc jobs at one time.
The spark jobs are configured to run on cluster mode. This causes the driver to be created in one of the containers in worker nodes.
spark.submit.deployMode=cluster
To understand more on cluster mode vs. client mode, please refer the reference links
The above autoscaling policy only allows scaling with secondary workers as weight for primary workers is set as 0.
The autoscaling policy defined is aggressive scaling to satisfy any yarn pending memory defined.
For more information on understanding of autoscaling parameters, please refer below reference links
Now with this basic setup, the DAG execution shows the spark job execution timings as below.
One interesting fact to notice is even though all the pyspark jobs are exactly same, the job timings are different ranging from 2 mins to peak of 5 mins and 19 seconds.
Lets also check the spark history server to see the job timings.
The spark history server is showing on a average of 1 min spark job execution time.
Now the question is if spark job is taking only 1 min for execution, why are the dataproc jobs taking average of 2–5 mins of execution time. This difference of time is very costly compared to spark execution time.
Lets try to dig in further to analyse more on this
Let’s take one job which took 5 mins 19 secs of execution time and try to understand where the time is being spent
Lets check the dataproc logs for the job
From the job logs, we can see the dataproc job submission to yarn nearly took 10 secs (which is fair).
Let’s have a check at Yarn Console for job execution timings for the job above
As we see one sample highlighted job, we can see the spark job started at 16:14:36 (UTC time) but took nearly 3 mins and 31 seconds to get launched.
So the job timings (5 mins 19 secs) can be broken down as below
- Submission of job to yarn from Dataproc — 30s
- Yarn job from accepted state to running state — 3m 31s
- Spark Execution Time -1m 18s
This shows the jobs are taking longer to get accepted (to get the required resources) for execution.
Lets dig in further and try to understand how much is the resource needed for the job.
Lets take a sample job and see how much the job resource was requested.
We can see the job has requested for 7 cores and 23 GB memory.
With the current cluster configuration, we have 8 cores and 24 Gb memory considering 2 n1-standard-4 machines
How did yarn arrive at the conclusion of 7 cores and 23 GB memory. Lets check the cluster properties
We can see 1 driver (1 core, 3GB) and 3 executors (6 cores and 17 GB) was requested which roughly matches the request above.
Spark performs dynamic allocation of resources to the job
From the above we can see, the total memory and core available and requested is as below
Total — <memory:24.66 GB, vCores:8>
Allocated — <memory:23 GB, vCores:7>
Based on the above, there is no memory now available to start another yarn application and dataproc cluster has to scale first. This causes the initial yarn applications to stay in the waiting state for a longer time.
So what solution will help in this case ?
1. Increase the number of machines
2. Increase the size of the machines
Lets check the CPU utilization of the dataproc cluster to determine if the sizing of cluster is good enough for the job.
We can see roughly the utilization is close to 50–60% which is good.
Lets have a look at the yarn available memory and pending memory
We can see the yarn pending memory for the duration of the job was requiring atleast 50 GB of additional memory.
Now the autoscaling works but requires some time to provision the secondary workers which adds to the waiting time.
Lets try by increasing the starting worker machines from 2–4 for handling 4 concurrent job executions as configured in the dag
Now we can yarn total resources has increased now to 16 cores and ~48GB memory.
Now with this configuration, we can see the job timings as below
Now we can see the yarn waiting time has drastically reduced (less erratic) and is able to handle the concurrency of 4 workloads with autoscaling to handle the memory requirements.
Lets also try to provide slightly higher size machine (n2-highmem-4) which has 4 cores and 32 GB as compared to (n1-standard-4) which has 4 cores and 15 GB memory and see if any difference.
So now total memory 110 GB and 16 cores is available (~40–50 GB more memory compared to previous configuration)
Now if we see the dataproc job timings, we can most of the jobs are in the range of 1–2 mins as expected with minimal yarn waiting time.
The CPU utilization looks pretty less at 40–50% on average.
The conclusion for this article is to monitor the yarn application needs by provisioning correct sized workers and correct number of initial workers to meet the concurrency requirements.
It is important to monitor the jobs to understand the time taken by the yarn application and spark job to compare and mitigate any waiting times
Autoscaling best practices should be followed to ensure proper resource availability for the cluster
Hope this article helps you understand the nuances of job executions on the dataproc cluster.
Please connect with me on https://www.linkedin.com/in/murli-krishnan-a1319842/ for any queries.
Happy learning !!