Demystifying Dataproc spark job executions

Murli Krishnan
Google Cloud - Community
6 min readApr 29, 2024

--

Dataproc and Spark are most sought out technology for data integration use-cases

One of the most common mode of job deployment strategy in dataproc is to leverage ephemeral cluster executions. This blog focusses on the issue of optimizing the job concurrency and allowing dataproc to process spark jobs more faster and efficiently.

Let’s try to understand the use-case at hand, we have 10 pyspark jobs (all similar jobs) to execute on a batch basis on ephemeral dataproc cluster.

For this we have used composer as orchestrating tool for creation of the ephemeral cluster, followed by 10 pyspark job submissions using dataproc job submit operator and finally deleting the cluster once the execution is done.

The composer DAG looks as below.

Demo DAG with concurrent spark jobs

The spark cluster configuration provided for this DAG is using basic worker configuration of 2 n1-standard-4 workers (4 cores and 15 Gb of memory per machine) and autoscaling policy as below

Also on the DAG level, we have defined “max_active_tasks” as 4 to launch only 4 dataproc jobs at one time.

The spark jobs are configured to run on cluster mode. This causes the driver to be created in one of the containers in worker nodes.

spark.submit.deployMode=cluster

To understand more on cluster mode vs. client mode, please refer the reference links

Autoscaling Policy

The above autoscaling policy only allows scaling with secondary workers as weight for primary workers is set as 0.

The autoscaling policy defined is aggressive scaling to satisfy any yarn pending memory defined.

For more information on understanding of autoscaling parameters, please refer below reference links

Now with this basic setup, the DAG execution shows the spark job execution timings as below.

Airflow job execution timings
Dataproc Job Timings

One interesting fact to notice is even though all the pyspark jobs are exactly same, the job timings are different ranging from 2 mins to peak of 5 mins and 19 seconds.

Lets also check the spark history server to see the job timings.

Spark history server

The spark history server is showing on a average of 1 min spark job execution time.

Now the question is if spark job is taking only 1 min for execution, why are the dataproc jobs taking average of 2–5 mins of execution time. This difference of time is very costly compared to spark execution time.

Lets try to dig in further to analyse more on this

Let’s take one job which took 5 mins 19 secs of execution time and try to understand where the time is being spent

Dataproc Job

Lets check the dataproc logs for the job

Dataproc job logs (UTC time)

From the job logs, we can see the dataproc job submission to yarn nearly took 10 secs (which is fair).

Let’s have a check at Yarn Console for job execution timings for the job above

Yarn Job timings (IST time)

As we see one sample highlighted job, we can see the spark job started at 16:14:36 (UTC time) but took nearly 3 mins and 31 seconds to get launched.

So the job timings (5 mins 19 secs) can be broken down as below

  1. Submission of job to yarn from Dataproc — 30s
  2. Yarn job from accepted state to running state 3m 31s
  3. Spark Execution Time -1m 18s

This shows the jobs are taking longer to get accepted (to get the required resources) for execution.

Lets dig in further and try to understand how much is the resource needed for the job.

Lets take a sample job and see how much the job resource was requested.

Requested Resources

We can see the job has requested for 7 cores and 23 GB memory.

With the current cluster configuration, we have 8 cores and 24 Gb memory considering 2 n1-standard-4 machines

Total Yarn memory and cores

How did yarn arrive at the conclusion of 7 cores and 23 GB memory. Lets check the cluster properties

Cluster Properties

We can see 1 driver (1 core, 3GB) and 3 executors (6 cores and 17 GB) was requested which roughly matches the request above.

Spark performs dynamic allocation of resources to the job

From the above we can see, the total memory and core available and requested is as below

Total — <memory:24.66 GB, vCores:8>
Allocated — <memory:23 GB, vCores:7>

Based on the above, there is no memory now available to start another yarn application and dataproc cluster has to scale first. This causes the initial yarn applications to stay in the waiting state for a longer time.

So what solution will help in this case ?
1. Increase the number of machines
2. Increase the size of the machines

Lets check the CPU utilization of the dataproc cluster to determine if the sizing of cluster is good enough for the job.

CPU utilization

We can see roughly the utilization is close to 50–60% which is good.

Lets have a look at the yarn available memory and pending memory

We can see the yarn pending memory for the duration of the job was requiring atleast 50 GB of additional memory.

Now the autoscaling works but requires some time to provision the secondary workers which adds to the waiting time.

Autoscaling

Lets try by increasing the starting worker machines from 2–4 for handling 4 concurrent job executions as configured in the dag

Now we can yarn total resources has increased now to 16 cores and ~48GB memory.

Now with this configuration, we can see the job timings as below

dataproc job timings

Now we can see the yarn waiting time has drastically reduced (less erratic) and is able to handle the concurrency of 4 workloads with autoscaling to handle the memory requirements.

Lets also try to provide slightly higher size machine (n2-highmem-4) which has 4 cores and 32 GB as compared to (n1-standard-4) which has 4 cores and 15 GB memory and see if any difference.

So now total memory 110 GB and 16 cores is available (~40–50 GB more memory compared to previous configuration)

Now if we see the dataproc job timings, we can most of the jobs are in the range of 1–2 mins as expected with minimal yarn waiting time.

dataproc job timings.

The CPU utilization looks pretty less at 40–50% on average.

The conclusion for this article is to monitor the yarn application needs by provisioning correct sized workers and correct number of initial workers to meet the concurrency requirements.

It is important to monitor the jobs to understand the time taken by the yarn application and spark job to compare and mitigate any waiting times

Autoscaling best practices should be followed to ensure proper resource availability for the cluster

Hope this article helps you understand the nuances of job executions on the dataproc cluster.

Please connect with me on https://www.linkedin.com/in/murli-krishnan-a1319842/ for any queries.

Happy learning !!

--

--