How to Submit Spark Serverless Jobs, Manage Quota and Capture Errors

Samet Karadag
Google Cloud - Community
4 min readApr 20, 2023

--

Dataproc Serverless integrates with wider ecosystem

Today Dataproc Serverless is the modernest way to run your spark jobs in GCP. It lets you get out of the cluster boundaries, giving the best out of cloud with auto-scalability, cloud based PAYG billing, integration with wider ecosystem like BigQuery, Vertex AI Notebooks, Hive Metastore and Dataplex and Persistent History Server etc.

In this blog post you can find some of the constraints that you need to consider while moving your spark workloads to serverless from a cluster environment.

1 - Compute CPU Quota Limits

There is a CPU quota per GCP project. The main advantage of this quota from the user perspective is to guardrail the number of CPUs so it does not cause unexpected billing bursts. The issue is; if you hit the limit, your job will fail with Quota CPUS exceeded error. The reason is Spark Serverless does not have yarn and it relies on the cloud infra and constraints thus there is no queueing as of writing this post.

To mitigate the CPU exceeded errors; First, you can use separate GCP projects per application so that they will not impact each other.

Secind, you can use one or multiple of the below approaches to manage CPU quota within the project. The goal is to manage the quota or job submission dynamically so that no job fails due to the quota limit.

Increase quota based on usage
Monitor quota and alert on a threshold such as 40%. When alert happens understand the underlying demand, increase the quota or optimise your expensive jobs

Pros:
Jobs will not get an error due to quota

Cons:
Spikes will still cause errors
Needs an incident management process to act on alert

Setting arbitrary large quota
Setting much more than enough cpu quota. Ex: set 600, while your jobs are expected to consume 100 at most running concurrently.

Constraints:
A limit above 600 will require approval from GCP support

Pros:
Jobs is not estimated to fail because of the arbitrary large quota

Cons:
No guardrail on billing. Ex: a bug causing multiple submissions and billing spike.

Queue jobs based on the quota availability
This option is nothing new but just providing a basic yarn like functionality to queue or make jobs wait if there isn’t enough quota. You can achieve this by submitting jobs using a wrapper code (sh) which checks existing utilisation of the quota.

Pros:
Jobs will not get an error due to quota

Cons:
Jobs will wait “unnecessarily”. If this happens frequently it is best to raise the quota. It is the same cost for 100 cores used for 10 hours vs 1000 cores used for 1 hour.
Wrapper code needs to be used for job submission — additional development maintenance and hard to comply if there are many teams and apps.

Below gcloud cmd gives the quota usage in europe-west3:

gcloud compute regions list | grep europe-west3 | awk ‘{print $2}’

Below is an example shell script which checks for the available quota and submits the job if there is enough threshold:

#!/bin/bash

THRESHOLD=50 # set the threshold value here which is needed vcpu count for the job
QUOTA=$(gcloud compute regions list | grep europe-west4 | awk '{split($2,a,"/"); print a[2]}')

while true; do
USAGE=$(gcloud compute regions list | grep europe-west4 | awk '{split($2,a,"/"); print a[1]}')

if [ $(($QUOTA - $USAGE)) -lt $THRESHOLD ]; then
echo "Waiting for CPU quota to become available..."
sleep 60 # wait for 60 seconds before checking again
else
echo "CPU quota is available. Running the next command..."
# run spark submit here
gcloud dataproc batches submit pyspark test.py --version=2.0 --region=europe-west4 --deps-bucket=serverless-test-samet --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.28.0.jar
break
fi
done

2- Spark Serverless Job Submission and Orchestration

One of the main difference with Spark Serverless submit from a normal spark-submit is it will return with zero code even if the job fails. It is the async process and cloud native processing, submit job issues an api call and if the job submission is successful it returns 0 at the end.

So if you are submitting your jobs from an orchestrator or an ETL tool which uses shell scripts, your “$?” return code of submit job will be 0.

You can use a wrapper script to submit spark serverless job, waits and return with non-zero code if job is failed.

Below is such an example script which submits async spark serverless job, waits and uses describe to get the end state and exits with non zero if the job fails:

#!/bin/bash

# Define variables
PROJECT_ID=project-id
REGION=region
DEPS_BUCKET_NAME=deps-bucket
JOB_NAME=test-samet
JAR_FILE=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.28.0.jar
PYSPARK_FILE=$1
VERSION=2.0
TIMESTAMP=$(date +"%Y-%m-%d-%H-%M-%S")
JOB_ID=$(echo ${PYSPARK_FILE}-${TIMESTAMP} | sed 's/\.py//')
APPNAME=$(echo ${PYSPARK_FILE}| sed 's/\.py//')

# Submit the job and capture the job ID
echo "Submitting job..."
gcloud dataproc batches submit pyspark ${PYSPARK_FILE} --async \
--project=${PROJECT_ID} \
--region=${REGION} \
--jars=${JAR_FILE} \
--version=${VERSION} \
--deps-bucket=${DEPS_BUCKET_NAME} \
--batch=${JOB_ID} \
--labels=appname=${APPNAME}

# Wait for the job to complete
echo "waiting for the job to complete..."
gcloud dataproc batches wait ${JOB_ID} --project=${PROJECT_ID} --region=${REGION}

STATE=$(gcloud dataproc batches describe ${JOB_ID} --project=${PROJECT_ID} --region=${REGION} --format='value(STATE)')
echo "Job State : ${STATE}"

if [[ ${STATE} == "FAILED" ]]; then
echo "Job ${JOB_ID} failed." >> ${JOB_NAME}.log
exit 1
else
echo "Job ${JOB_ID} succeeded." >> ${JOB_NAME}.log
exit 0
fi

Hope you find this useful, and please let me know in the chats if you have questions, other ways or issues.

--

--