Distributed computing on Spark with Dataproc
Getting acquaintance with GCP Dataproc and distributed computing on Spark with
Spark and Hadoop are widely recognized as leading technologies for big data processing. In this article, we will explore how to quickly get started using Spark in your development cycle without having to worry about infrastructure support. Specifically, we will cover how to create a Spark cluster, run jobs, and schedule those jobs to run automatically. It is assumed that you already understand the need for Spark and are looking for practical guidance on how to use it in your workflow.
The Google Cloud Platform (GCP) offers Dataproc, a service that includes Spark and Hadoop as well as a range of other open-source tools such as Hive, Jupyter Notebook, and Presto. I have used Dataproc in data lake and data warehouse development and have found it to be a convenient option due to its integration with GCP. One major advantage is that we do not have to handle the operational support for Spark and Hadoop, as all configurations and jobs are written in code and stored in Git. It is also worth mentioning the relatively fast cluster creation time of about 1.5 minutes.
To provide hands-on experience, code examples are available in a repository linked at the end of this article. These examples will be referenced throughout the article with explanations provided at the conclusion. I’m using Dataproc which uses VMs.
1. Dataproc cluster creation
Before setting up a Dataproc cluster, it is important to carefully consider the cluster size as it can impact both performance and cost. The cost of a cluster is calculated based on the number of CPUs, the duration of usage, and a fixed rate of $0.01 per CPU-hour.
One way to optimize cost is to use the auto-scaling feature, which automatically adjusts the cluster size based on workload. The auto-scaling feature uses YARN metrics, such as the amount of pending memory, to determine when to scale the cluster. When configuring a Dataproc cluster, you will need to consider the following factors:
One way to optimize cost is to use the auto-scaling feature, which automatically adjusts the cluster size based on workload. The auto-scaling feature uses YARN metrics, such as the amount of pending memory, to determine when to scale the cluster. When configuring a Dataproc cluster, you will need to consider the following factors:
- Auto-scaling — as mentioned above, it allows you to scale up and down depending on the current load, it is also possible to define how aggressive the scaling operation should be.
- Nodes type — all standard machines are supported, but practice has shown that you would need to use a custom machine type. For example, we are using 50 GiB for a master node with 8 CPU to smoothly start all our jobs.
- Spark and Dataproc properties — you will need it to fine tune the cluster. We are using Spark properties to define the number of cores and executors.
- Initialization actions — when you need to make sure that all new nodes after scale operations have a certain state, you may use this configuration option. It allows you to execute scripts on new nodes when they are created. For example, we are adding a BigQuery connector which is not part of the default setup configuration of Dataproc.
- Python packages
- Auto-deletion
The example shell command:
gcloud dataproc clusters create "${CLUSTER_NAME}" \
--project="${GCP_PROJECT}" \
--bucket="${CLUSTER_BUCKET}" \
--region="${GCP_REGION}" \
--autoscaling-policy="${CLUSTER_AUTOSCALE_NAME}" \
--master-machine-type="${CLUSTER_MACHINE_TYPE_MASTER}" \
--master-boot-disk-size=500 \
--num-workers=2 \
--num-secondary-workers=0 \
--worker-machine-type="${CLUSTER_MACHINE_TYPE_WORKER}" \
--worker-boot-disk-size=500 \
--image-version="2.0-debian10" \
--enable-component-gateway \
--optional-components="JUPYTER" \
--scopes="https://www.googleapis.com/auth/cloud-platform" \
--labels="${CLUSTER_LABELS}" \
--properties="dataproc:dataproc.logging.stackdriver.job.driver.enable=true,dataproc:dataproc.logging.stackdriver.job.yarn.container.enable=true,spark:spark.executor.cores=5,spark:spark.executor.instances=2" \
--initialization-actions="gs://goog-dataproc-initialization-actions-${GCP_REGION}/connectors/connectors.sh,gs://goog-dataproc-initialization-actions-${GCP_REGION}/python/pip-install.sh" \
--metadata="bigquery-connector-version=1.2.0,spark-bigquery-connector-version=0.21.0" \
--metadata="PIP_PACKAGES=psycopg2" \
--service-account="${CLUSTER_SERVICE_ACCOUNT}" \
--max-idle=3600s \
--quiet
Full snippet: https://github.com/xtrmstep/dataproc-distrib-comp/blob/main/src/1_create_cluster/run.sh
This will create a cluster with one master node and two worker nodes --num-workers=2
, auto-scaling enabled, BigQuery connector and enabled additional logs.
2. Submit a job
There are several ways to submit jobs to Dataproc, including using Cloud Functions, the gcloud command-line interface (CLI), and workflows. While it is possible to use Cloud Functions for this purpose, it is generally not recommended as it adds complexity to the system and introduces an additional potential point of failure. Instead, the recommended approach for creating scheduled jobs in Dataproc is to use workflows. I will talk about workflows later in the article, in the section about workflows and schedulers.
For situations where automation is not yet needed, the gcloud CLI can be a convenient option for submitting jobs. The gcloud CLI allows users to submit jobs directly from the command line by specifying the required parameters such as the cluster to use and the job file to submit.
The example shell command:
gcloud dataproc jobs submit pyspark "${MAIN_PYTHON_FILE}" \
--project="${GCP_PROJECT}" \
--region="${GCP_REGION}" \
--cluster-labels="sdce-type=testing" \
--bucket="${CLUSTER_BUCKET}" \
--properties-file="${SPARK_PROPERTIES_FILE}" \
--async
Full snippet: https://github.com/xtrmstep/dataproc-distrib-comp/blob/main/src/2_run_simple_job/run.sh
Distributed computing
When a job is submitted to a Dataproc cluster, it is initially received by the Dataproc agent. The agent then checks for available resources and determines if there is a cluster that can take on the job. If a suitable cluster is found, the job is passed on to Spark for processing. Cluster could be found by label or by name.
In Spark, a driver process is created on the master node and the actual processing tasks are assigned to executors and workers. The level of parallelism depends on the number of cores and executors available. Cores correspond to CPU resources and are limited by the number of CPUs on a machine. As a best practice, it is recommended to reserve one CPU for system needs and configure Spark to use the remaining cores. Executors are processes that run on worker nodes and are responsible for executing Spark tasks. They are created with a fixed number of cores and a fixed amount of memory. Adjusting the number of cores and executors can help optimize the performance of Spark jobs, in addition to other factors such as data shuffling.
To ensure that tasks are executed in a distributed manner, the Python code in a Spark job must be executed using the Spark SDK and RDDs (Resilient Distributed Datasets). The most commonly used commands for this are Map().collect()
and Foreach()
. If multithreading is used without RDDs, the tasks will only be run on the master node. However, it is possible to use multithreading within RDDs.
3. Run on executors
To illustrate how a job is executed on the cluster and using different worker nodes, I’ve created a simple routine which takes an array, creates an RDD and simulates the processing of items. During the processing it gets the host name on which it’s executed. The code below is a bit modified to show important moments.
Here is how the function process_func
is submitted to the cluster for processing of data in gcs_files
(this function is a pointer to another function def empty_task(filename: str)
):
gcs_files = [f"gs://{bucket_name}/{b.name}" for b in blobs]
spark_conf = spark.getConf()
sys_cores = int(spark_conf.get("spark.executor.cores"))
sys_executors = int(spark_conf.get("spark.executor.instances"))
effective_partitions = sys_cores * sys_executors
# create a distributed dataset (RDD) to submit to Spark cluster
files_rdd = spark.parallelize(gcs_files, numSlices=effective_partitions)
# executing the method on the cluster in parallel manner
result = files_rdd.map(process_func)\
.collect() # this collect triggers the actual work of the Spark
def empty_task(filename: str):
sleep(random.Random().randrange(1, 5)) # wait from 1 to 5 seconds
result = dict(
filename=filename,
host=socket.gethostname(),
started_at=started_at,
elapsed_sec=elapsed
)
return result
The job could be submitted like this:
gcloud dataproc jobs submit pyspark "${MAIN_PYTHON_FILE}" \
--project="${GCP_PROJECT}" \
--region="${GCP_REGION}" \
--cluster-labels="sdce-type=testing" \
--bucket="${CLUSTER_BUCKET}" \
--properties-file="${SPARK_PROPERTIES_FILE}" \
-- "${DATA_BUCKET_SOURCE}" "${DATA_BUCKET_OUTPUT}" dry 100
After the last --
you can see arguments and the value dry
among them is important for this example, because it tells the job just to simulate the work. You can learn the code and see that the not-dry run will unzip files. The output of the job shows how many items were processed on which host:
Full snippet: https://github.com/xtrmstep/dataproc-distrib-comp/blob/main/src/3_run_tasks_on_executors/run.sh
You can see that all of them were distributed equally between two hosts on the cluster. It’s because the default logic for distribution of tasks on the cluster is “round robin”. The Spark property which determines this is spark.scheduler.mode
with default value FAIR
.
When a job is submitted to the Spark cluster, it important to know about the configuration of following parameters:
--num-executors
— number of executors on cluster--executor-memory
— memory per executor--executor-cores
— CPUs per executor
The optimal configuration should allow executors to not overlap in their demand for cores and memory.
4. Workflows & Schedulers
In real life we would like to configure Spark jobs to run automatically according to a schedule. Dataproc doesn’t have the ability to configure jobs and run them by schedule. So that to achieve the desired behavior we will need to make a configuration of several components.
Workflows allows to build a DAG from multiple jobs, defining dependency between them. It is possible to specify how a cluster is selected for the workflow and some other parameters. The workflow could be described using YAML. The very simplified version of the YAML is below.
jobs:
- pysparkJob:
args:
- ARG1
- ARG2
- ARG3
- ARG4
mainPythonFileUri: MAIN_PY
stepId: test_workflow-1
placement:
clusterSelector:
clusterLabels:
sdce-type: testing
In the snipped which I’ve prepared for this example, you may find how the final YAML could look like (after some replacements): https://github.com/xtrmstep/dataproc-distrib-comp/blob/main/src/4_workflows/run.sh
The following command will create a workflow:
gcloud dataproc workflow-templates import "${WORKFLOW_NAME}" \
--project="${GCP_PROJECT}" \
--region="${GCP_REGION}" \
--source="./${WORKFLOW_NAME}.yaml"
When instantiated, it will submit a single job to the Dataproc cluster using a label sdce-type: testing
.
The workflow doesn’t have any possibility to specify a scheduler, so that some trigger should instantiate the workflow. It’s possible using GCP API. The trigger could call an API using a scheduler and GCP offers the service Cloud Scheduler to do this.
The following command will create a scheduler:
gcloud scheduler jobs create http "${WORKFLOW_NAME}" \
--project="${GCP_PROJECT}" \
--location="${GCP_REGION}" \
--schedule="0 0 1 * *" \
--uri="https://dataproc.googleapis.com/v1/projects/${GCP_PROJECT}/regions/${GCP_REGION}/workflowTemplates/${WORKFLOW_NAME}:instantiate?alt=json" \
--http-method=POST \
--max-retry-attempts=1 \
--time-zone=Europe/Sofia \
--oauth-service-account-email="${CLUSTER_SERVICE_ACCOUNT}" \
--oauth-token-scope=https://www.googleapis.com/auth/cloud-platform
You may notice how API is used to instantiate a Dataproc workflow. The parameter --uri
contains the endpoint and --oauth-service-account-email
and --oauth-token-scope
are used to specify a service account which will be used to call the API. Parameter schedule defines a standard cron schedule to execute and call the API which will instantiate the workflow and associated jobs.
There is only one problem with this approach: it’s not possible to specify how many parallel executions of the same workflow are allowed. If you would want to have only one workflow instance running at a time, the approach to this should be developed by you. For example, we are additionally registering the workflow in the beginning of the first job execution and unregister it in the end (in the last job). This allows us to check if a certain workflow is running.
Full snippet: https://github.com/xtrmstep/dataproc-distrib-comp/blob/main/src/4_workflows/run.sh
5. SparkSQL
My repository also contains an example of SparkSQL which could be handy for those teams where Python is not as common as in others. Basically, if you would like to keep the skill set for your DBAs the same, but allow them to use Spark, SparkSQL will allow you to achieve this. It’s possible to submit pure SQL or use SQL syntax in PySpark which could be useful when you have complex transformations. For example, the code below performs the same task, but written using fluent API and SQL.
Fluent API:
def execute_py(number_of_records, session):
raw_df = session.read.csv(Settings.BUCKET_INPUT, header=True)\
.filter(F.col("uri_path").endswith('.sgml')) \
.withColumn("parth_parts", F.split('uri_path', '/')) \
.withColumn("path_idx_1", F.col('parth_parts').getItem(4)) \
.withColumn("path_idx_2", F.col('parth_parts').getItem(5)) \
.withColumn("rn", F.row_number().over(pss.Window.partitionBy('path_idx_1').orderBy('path_idx_2'))) \
.filter("rn == 1") \
.select("_time", "uri_path") \
.limit(number_of_records)
return raw_df
SQL:
def execute_sql(number_of_records, session):
session.read.csv(Settings.BUCKET_INPUT, header=True).createOrReplaceTempView("raw_df")
session.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW transformed_df
AS
WITH
raw_data AS (
SELECT
*,
SPLIT(uri_path, '/')[4] AS path_idx_1,
SPLIT(uri_path, '/')[5] AS path_idx_2
FROM raw_df
WHERE uri_path LIKE '%.sgml'
)
,ordered AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY path_idx_1 ORDER BY path_idx_2 DESC) AS rn
FROM raw_data
)
SELECT _time, uri_path FROM ordered WHERE rn = 1 LIMIT {number_of_records}
""")
df = session.table("transformed_df")
return df
You may notice that SQL variant is cleaner and more readable.
Full snippet: https://github.com/xtrmstep/dataproc-distrib-comp/blob/main/src/5_sparksql/run.sh
Repository with code snippets
My repository contains all scripts and additional information, presented in readme.md
files. You may find several “steps” in the repository with Bash scripts run.sh
, but they were created with the idea that all of the scripts are located in the same file. So that they will work fine if you are executing every run.sh
file by copying its content to one and the same terminal and executing them one after another. This will ensure that environment variables are defined and have correct values.
In the end, there is step #6 which has cleaning commands. They are removing all objects created during previous steps. In the end some cost could be applied to your GCP account, around $10 or a bit less.