Serverless Spark ETL Pipeline Orchestrated by Airflow on GCP

Ravi Manjunatha
Google Cloud - Community
5 min readJun 25, 2022

A Big Data Spark engineer spends on an average only 40% on actual data or ml pipeline development activity. Most of their time is often devoted to managing the clusters or optimizing the spark application variables.

As an alternative to this monolith set up of clusters is the cloud based serverless, no-ops platforms like BigQuery(SQL based) or Data Fusion(no code, UI based) on GCP. While, Engineers really like these Serverless Analytics solutions , they also prefer to keep their pipelines in pyspark or scala for easy portability if required.

Spark Engineer expecting a giant Spark Cluster from their new Q(spark quarter master)

As a golden mean, to free up the Spark engineers from non-analytics activities while being able to retain or continue to write spark native applications, is the Serverless Spark solution on GCP.

Q surprises Spark Engineer with Serverless Spark!! We really don’t go for cluster in the new world :-)

It is industry’s first autoscaling Serverless Spark which doesn't require any manual infrastructure provisioning. It is enabled on,

> Vertex AI notebooks for Data and ML engineers to launch Jupyter Notebooks and write spark code (now GA)

> BigQuery where Data Analysts can write and execute pyspark code along with BigQuery SQL (private preview)

>They can be orchestrated as full Data and ML pipelines using Airflow or Cloud Composer (now GA)

In this article, I will discuss how a Spark ETL pipeline can be executed in a completely serverless mode on GCP.

First let us run a simple Spark Pi Application in Serverless Mode.

Navigate to the Dataproc console in GCP, scroll down to Serverless Batches and click ‘Create’ ,

Enter the batch details as shown below, you can leave the other options as default ones and then click submit,

The batch job will start and move from Pending to Running status within 30 seconds, (setting up a Dataproc Cluster takes about 90 seconds and setting up a Hadoop cluster on-premise is anybody’s guess)

The output of the Spark job can be seen as below,

Executor status can be seen under the ‘Monitoring’ section

As can be be seen by now, there is no cluster to create or manage and all compute is created and deleted automatically.

Now that we have seen a ‘Hello World’ example of Serverless Spark, let us now proceed to build a ETL pipeline on Serverless Spark orchestrated by Cloud Composer (managed Apache Airflow).

The solution architecture of the ETL pipeline is as below,

For this pipeline, we can consider a simple Aggregation example. A text file with few records containing the name of the person and their age can be written and stored in GCS bucket. The pyspark code should calculate the average age by person and write the data to a BigQuery table.

A dag in Cloud Composer (managed Apache Airflow in GCP) will initiate a batch operator on Dataproc in serverless mode. The dag will find the average Age by person and store the results in the BigQuery dataset.

Raw data in GCS bucket :

name,age
Brooke,20
Denny,31
Jules,30
TD,35
Brooke,25
TD,45
Jules,69

Cloud Composer DAG:

import os
from airflow.models import Variable
from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (DataprocCreateBatchOperator,DataprocGetBatchOperator)
from datetime import datetime
from airflow.utils.dates import days_ago
import string
import random # define the random module
S = 10 # number of characters in the string.
# call random.choices() string module to find the string in Uppercase + numeric data.
ran = ''.join(random.choices(string.digits, k = S))
project_id = models.Variable.get("project_id")
region = models.Variable.get("region")
subnet=models.Variable.get("subnet")
phs_server=Variable.get("phs") ## create a Spark Persistent history server
code_bucket=Variable.get("code_bucket")
bq_dataset=Variable.get("bq_dataset")
name=<<your username>>dag_name= "serverless_spark_etl"
service_account_id= <<enter your service account>>
avg_age_script= "gs://"+code_bucket+"/00-scripts/avg_age.py"BATCH_ID = "avg-age-"+str(ran)BATCH_CONFIG1 = {
"pyspark_batch": {
"main_python_file_uri": avg_age_script,
"args": [
project_id,
bq_dataset,
code_bucket,
name
],
"jar_file_uris": [
"gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar"
]
},
"environment_config":{
"execution_config":{
"service_account": service_account_id,
"subnetwork_uri": subnet
},
"peripherals_config": {
"spark_history_server_config": {
"dataproc_cluster": f"projects/{project_id}/regions/{region}/clusters/{phs_server}"
}
},
},
}
with models.DAG(
dag_name,
schedule_interval=None,
start_date = days_ago(2),
catchup=False,
) as dag_serverless_batch:
# [START how_to_cloud_dataproc_create_batch_operator]
create_serverless_batch1 = DataprocCreateBatchOperator(
task_id="Avg_Age",
project_id=project_id,
region=region,
batch=BATCH_CONFIG1,
batch_id=BATCH_ID,
)
create_serverless_batch1

Pyspark code to calculate the avergae age of authors,

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
project_name =<<enter your project id>>
dataset_name='serverless_spark'
# Create a DataFrame using SparkSession
spark = SparkSession.builder.appName("ETL").config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar').getOrCreate()
input_data="gs://"+<<enter your bucket name>>+"01-datasets/Authors.csv"#Reading the Input Data
data_df = spark.read.format("csv").option("header", True).option("inferschema",True).load(input_data)
data_df.printSchema()
# Group the same names together, aggregate their ages, and compute an average
avg_df = data_df.groupBy("name").agg(avg("age"))
# Show the results of the final execution
avg_df.show()
# extract columns to create country table
avg_table = avg_df.selectExpr("name").dropDuplicates()
avg_table.write.format('bigquery') .mode("overwrite").option('table', project_name+':'+dataset_name+'._avgage') .save()

Once you place the dag file under the dags folder of the Airflow bucket in GCS, the dag serverless_spark_etl should show in the Airflow UI,

You can then trigger the dag,

Serverless Batch operator getting triggered from Composer and launching the Serverless batch execution in Dataproc,

Spark execution History :

You can access the Spark History Server from the Persistent History Server,

Data Output in BigQuery :

Billing : For the cost of all Serverless Spark runs, you can track the SKUs in the billing report as below,

In the next article, I will discuss how the Serverless Spark Jupyter Notebook can be launched and a ML Pipeline can be built and orchestrated by Cloud Composer.

--

--