Event-driven Data Pipeline with Cloud Workflows and Serverless Spark

Karim Wadie
Google Cloud - Community
9 min readJun 30, 2023

In some situations, we need to process individual files as soon as they arrive to our platform in order to minimise the time between ingestion and analytics. This is different from the more common approach of batch workloads that process all files (batches) on a regular interval using an orchestration tool (e.g. Airflow).

In this article we will build a cost effective server-less data pipeline on GCP that process files as soon as they land on Google Cloud Storage (GCS) and store the results in BigQuery along with a monitoring view on the actions taken on each file. The end-to-end sample pipeline is available on GitHub with reusable Terraform modules

Pipeline Overview

Event-driven pipeline architecture

GCS: Files are uploaded/exported by upstream application(s) to a Google Cloud Bucket.

Pub/Sub notifications for GCS: For each object creation operation on GCS, a notification is sent to a PubSub topic containing the metadata of the object and the operation. This is a configuration on the bucket level.

PubSub Push Subscription: listens to the GCS notifications topic and immediately pushes them to a Cloud Function for processing

Cloud Function: triggers a server-less workflow on Cloud Workflows and pass it the required parameters (e.g. file path on GCS to be processed). In non-trivial data processing workflows we need to use an orchestration tool such as Cloud Workflows to define and orchestrate the processing steps rather than orchestrating them in Cloud Function to avoid limitations (e.g. time outs)

Cloud Workflows: defines the processing steps required and dependencies between them in a Yaml file. It will handle the communication with other APIs and retries on failures. In our sample pipeline, the processing steps are the following:

  • Run a WordCount PySpark application on Dataproc Serverless and store the results in a BigQuery table.
  • Aggregate the word count results into another table using a SQL operation on BigQuery.

Building the Pipeline Components

PySpark Word Count

The PySpark job is straight forward and used as a sample. It expects 3 parameters:

  • input_expression: the GCS path to read from. This should be passed the file path we receive from the Pub/Sub notification from GCS
  • output_table: the BigQuery table to write the word count results to
  • temp_bucket: GCS bucket name to store temp files generated by the job
# https://github.com/kwadie/cloud-workflows-demo/blob/master/spark/wordcount/wordcount.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import argparse

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

parser = argparse.ArgumentParser()
parser.add_argument("--input_expression", help="GCS input path to read from. gs://bucket/path or gs://bucket/file ")
parser.add_argument("--output_table", help="Bigquery table to write word count results. Format project:dataset.table")
parser.add_argument("--temp_bucket", help="GCS path to store temp files. Format bucket/path/ without gs://")
args = parser.parse_args()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector in case of writeMethod=indirect
spark.conf.set('temporaryGcsBucket', args.temp_bucket)

sc = spark.sparkContext
lines = sc.textFile(args.input_expression)
words = lines.flatMap(lambda line: line.split())
word_count = words\
.map(lambda word: (word, 1))\
.reduceByKey(lambda count1, count2: count1 + count2)

output_columns = ["word", "word_count"]
word_count_df = word_count.toDF(output_columns)

# add the file path as a literal column
word_count_df = word_count_df.withColumn("input_file", lit(args.input_expression))

word_count_df.show()
word_count_df.printSchema()

# Saving the data to BigQuery
word_count_df.write.format('bigquery') \
.option("table", args.output_table) \
.option("writeMethod", "indirect") \
.option("createDisposition", "CREATE_IF_NEEDED") \
.mode("append") \
.save()

Cloud Workflow

The workflow consists of two main steps:

  • run_spark_job: submits an HTTP post request to the Dataproc API while passing the parameters such as the service account to be used by the job, the job ID (i.e. batch_id) and the Job parameters themselves such as the input file path and the PySpark job file (stored on GCS)
  • sql_aggregate_word_count: submits an HTTPS post request to the BigQuery API to execute a SQL job that aggregates the counts. As the time of writing, Cloud Workflows doesn’t provide a templating mechanism to encapsulate long SQL code in separate files and parameterize it from the main Yaml file. To avoid clutter in the Yaml file, we encapsulate the SQL logic in a stored procedure and call it in a 1-line SQL statement instead.
# https://github.com/kwadie/cloud-workflows-demo/blob/master/workflows/spark-serverless-wordcount/pipeline.yaml

main:
params: [args]
steps:
- init:
assign:
- batch_id: ${args.spark_job_name_prefix + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
# https://cloud.google.com/workflows/docs/http-requests
- run_spark_job:
call: http.post
# Spark Batch API doc --> https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches/create
args:
url: ${"https://dataproc.googleapis.com/v1/projects/" + args.project +"/locations/"+ args.dataproc_region + "/batches"}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
headers:
Content-Type: "application/json"
query:
batchId: ${batch_id}
body:
labels:
tracker: ${args.tracker}
environmentConfig:
executionConfig:
serviceAccount: ${args.spark_service_account}
# Currently we're using the default network. On a Customer environment the subnetwork should be set
#subnetworkUri: " regions/<region>/subnetworks/<subnetwork>"
pysparkBatch:
jarFileUris: ${args.spark_dep_jars}
args: ${args.spark_job_args}
mainPythonFileUri: ${args.pyspark_file}
runtimeConfig:
version: ${args.dataproc_runtime_version}
properties: { }
result: response_message

- check_job:
call: http.get
args:
url: ${"https://dataproc.googleapis.com/v1/projects/"+ args.project +"/locations/"+ args.dataproc_region +"/batches/" + batch_id}
auth:
type: OAuth2
result: jobStatus
- check_job_done:
switch:
- condition: ${jobStatus.body.state == "SUCCEEDED"}
next: sql_aggregate_word_counts
- condition: ${jobStatus.body.state == "FAILED"}
raise: "Spark Job Failed"
- wait:
call: sys.sleep
args:
seconds: 30
next: check_job

- sql_aggregate_word_counts:
# https://cloud.google.com/workflows/docs/reference/googleapis/bigquery/v2/jobs/query
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${args.project}
body:
query: "CALL sandbox.aggregate_word_counts();"
useLegacySql: "false"

- the_end:
return: "SUCCESS"

Cloud Function

The function is just a wrapper on top of the Cloud Workflow we already defined. It gathers the required parameters for the workflow from the environment and the GCS-to-PubSub notification message and invoke the workflow via the Cloud Workflows Python SDK

# https://github.com/kwadie/cloud-workflows-demo/blob/master/functions/wordcount/main.py

from flask import escape
import functions_framework

@functions_framework.http
def execute_cloud_workflow(request):

from google.cloud import workflows_v1beta
from google.cloud.workflows import executions_v1beta
from google.cloud.workflows.executions_v1beta.types import executions
from google.cloud.workflows.executions_v1beta.types import Execution
import json
import os
from google.cloud import logging

# These should be env variables
project = os.environ.get('PROJECT')
region = os.environ.get('REGION')
workflow = os.environ.get('CLOUD_WORKFLOW_NAME')
pyspark_file = os.environ.get('PYSPARK_FILE')
spark_service_account = os.environ.get('SPARK_SERVICE_ACCOUNT')
bq_output_table = os.environ.get('BQ_OUTPUT_TABLE')
spark_temp_bucket = os.environ.get('SPARK_TEMP_BUCKET') # format bucket/path/ without gs://
tracker_log_name = os.environ.get('TRACKER_LOG_NAME')

logger = logging.Client().logger(tracker_log_name)

request_json = request.get_json(silent=True)

pubsub_message_id = request_json['message']['messageId']
file_creation_time = request_json['message']['attributes']['eventTime']
source_bucket = request_json['message']['attributes']['bucketId']
source_object = request_json['message']['attributes']['objectId']
source_file_path = f"gs://{source_bucket}/{source_object}"

execution_client = executions_v1beta.ExecutionsClient()

# use the message id as a trace to link this execution with the created workflow and later steps
tracker = pubsub_message_id

workflow_arguments_dict = {
"pyspark_file": pyspark_file,
"spark_job_args": [f"--input_expression={source_file_path}",
f"--output_table={bq_output_table}",
f"--temp_bucket={spark_temp_bucket}"
],
"spark_dep_jars": ["gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.27.1.jar"],
"dataproc_runtime_version": "2.0",
"project": f"{project}",
"dataproc_region": f"{region}",
"spark_service_account": f"{spark_service_account}",
"spark_job_name_prefix": "wordcount-workflows-",
"tracker": tracker
}

# Initialize request argument(s)
request = executions_v1beta.CreateExecutionRequest(
parent=f"projects/{project}/locations/{region}/workflows/{workflow}",
execution=Execution(argument=json.dumps(workflow_arguments_dict))
)

# Execute the workflow.
workflow_response = execution_client.create_execution(request)

result_dict = {
"file": source_file_path,
"file_creation_time": file_creation_time,
"workflow_execution": workflow_response.name,
"tracker": tracker,
}
logger.log_struct(
result_dict,
severity="INFO"
)

return result_dict

Bucket, Notifications and PubSub

In order to connect the above components together we need the following resources (defined in Terraform):

  • Service accounts to run the PySpark job, workflows, Cloud Function and PubSub Push Subscription
  • Service accounts IAM bindings according to each one’s minimal needs
  • Copying the PySpark file from local repo to a GCS “resources” bucket
  • Deploying the Cloud Workflow YAML file to the service
  • Deploying the Cloud Function python code
  • PubSub Topic and Push Subscription to the Cloud Function
  • GCS bucket to land incoming data files
  • GCS notifications to PubSub configuration attached to the bucket
# https://github.com/kwadie/cloud-workflows-demo/blob/master/terraform/modules/file-trigger-pipeline/main.tf

locals {

default_env_variables = {
PROJECT : var.project,
REGION : var.compute_region,
CLOUD_WORKFLOW_NAME : var.pipeline_name
PYSPARK_FILE : "gs://${var.resource_bucket_name}/${google_storage_bucket_object.copy_spark_file.name}"
SPARK_SERVICE_ACCOUNT : google_service_account.sa_spark.email
TRACKER_LOG_NAME: var.tracker_log_name
}
}

############## Service Accounts #########################

resource "google_service_account" "sa_spark" {
project = var.project
account_id = var.sa_spark
display_name = "Runtime SA for Serverless Spark for pipeline ${var.pipeline_name}"
}


resource "google_service_account" "sa_workflow" {
project = var.project
account_id = var.sa_workflow
display_name = "Runtime SA for Cloud Workflow for pipeline ${var.pipeline_name}"
}

resource "google_service_account" "sa_function" {
project = var.project
account_id = var.sa_function
display_name = "Runtime SA for Cloud Function for pipeline ${var.pipeline_name}"
}

resource "google_service_account" "sa_pubsub" {
project = var.project
account_id = var.sa_pubsub
display_name = "Runtime SA for PubSub Push Subscription for pipeline ${var.pipeline_name}"
}


############## Service Accounts Permissions #########################

######## SA workflows permissions

resource "google_project_iam_member" "sa_workflows_roles" {
project = var.project
for_each = toset([
"roles/dataproc.admin",
"roles/bigquery.dataEditor",
"roles/bigquery.jobUser",
])
role = each.key
member = "serviceAccount:${google_service_account.sa_workflow.email}"
}

# sa_workflows must be a serviceAccountUser on sa_spark to submit spark jobs that uses sa_spark as it's service account
resource "google_service_account_iam_member" "sa_workflows_role_sauser_spark" {
service_account_id = google_service_account.sa_spark.name
role = "roles/iam.serviceAccountUser"
member = "serviceAccount:${google_service_account.sa_workflow.email}"
}


######## SA spark permissions


resource "google_project_iam_member" "sa_spark_roles" {
project = var.project
for_each = toset([
"roles/dataproc.worker",
"roles/bigquery.admin",
"roles/storage.objectViewer",
])
role = each.key
member = "serviceAccount:${google_service_account.sa_spark.email}"
}

####### SA cloud functions permissions

# functions SA needs to invoke Cloud Workflows
resource "google_project_iam_member" "sa_function_roles" {
project = var.project
for_each = toset([
"roles/workflows.invoker",
"roles/logging.logWriter"
])
role = each.key
member = "serviceAccount:${google_service_account.sa_function.email}"
}

####### SA pubsub permissions

# pubsub push subscription needs to use cloud functions v2 sa in order to invoke it
resource "google_service_account_iam_member" "sa_pubsub_sa_user_functions" {
service_account_id = google_service_account.sa_function.name
role = "roles/iam.serviceAccountUser"
member = "serviceAccount:${google_service_account.sa_pubsub.email}"
}

// pubsub push subscription needs to invoke the underlying Cloud run service of CFv2
resource "google_cloud_run_service_iam_member" "sa_pubsub_invoker" {

project = var.project
location = var.compute_region
service = google_cloudfunctions2_function.gcs_functions.service_config[0].service
role = "roles/run.invoker"
member = "serviceAccount:${google_service_account.sa_pubsub.email}"
}

##### Spark Job #################################

# copy the spark job from the repo to GCS
# use loops to deploy multiple files
resource "google_storage_bucket_object" "copy_spark_file" {
name = var.spark_job_gcs_postfix
source = var.spark_job_path
bucket = var.resource_bucket_name
}

##### Cloud Workflow #################################

resource "google_workflows_workflow" "workflow" {
project = var.project
name = var.pipeline_name
region = var.compute_region
source_contents = file(var.workflow_local_path)
service_account = google_service_account.sa_workflow.email

}

##### Cloud Function #################################

# Generates an archive of the source code compressed as a .zip file.
data "archive_file" "source" {
type = "zip"
source_dir = var.cloud_function_src_dir
output_path = var.cloud_function_temp_dir
}

# Add source code zip to the Cloud Function's bucket
resource "google_storage_bucket_object" "zip" {
source = data.archive_file.source.output_path
content_type = "application/zip"

# Append to the MD5 checksum of the files' content
# to force the zip to be updated as soon as a change occurs
name = "src-${data.archive_file.source.output_md5}.zip"
bucket = var.resource_bucket_name
}

# Create the Cloud function triggered by a `Finalize` event on the bucket
resource "google_cloudfunctions2_function" "gcs_functions" {
name = var.pipeline_name
project = var.project
location = var.compute_region

build_config {
runtime = "python310"
entry_point = "execute_cloud_workflow" # Set the entry point
source {
storage_source {
bucket = var.resource_bucket_name
object = google_storage_bucket_object.zip.name
}
}
}

service_config {
max_instance_count = 3
min_instance_count = 1
available_memory = "1Gi"
timeout_seconds = 60
max_instance_request_concurrency = 80
available_cpu = "2"
environment_variables = merge(local.default_env_variables, var.cloud_function_extra_env_variables)
ingress_settings = "ALLOW_INTERNAL_ONLY"
all_traffic_on_latest_revision = true
service_account_email = google_service_account.sa_function.email
}
}

######## Data bucket with PubSub Notifications ###############

# GCS bucket to store data files and trigger notifications on object creation
resource "google_storage_bucket" "data_bucket" {
name = var.data_bucket_name
location = var.data_region
force_destroy = true
uniform_bucket_level_access = true
}


# https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic

resource "google_pubsub_topic" "gcs_notification_topic" {
project = var.project
name = var.topic_name
}

# https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription

resource "google_pubsub_subscription" "gcs_notification_subscription" {
project = var.project
name = var.subscription_name
topic = google_pubsub_topic.gcs_notification_topic.name

# Policy to delete the subscription when in-active
expiration_policy {
# Never Expires. Empty to avoid the 31 days expiration.
ttl = ""
}

retry_policy {
# The minimum delay between consecutive deliveries of a given message
minimum_backoff = "60s" #
# The maximum delay between consecutive deliveries of a given message
maximum_backoff = "600s" # 10 mins
}

push_config {
push_endpoint = google_cloudfunctions2_function.gcs_functions.service_config[0].uri

oidc_token {
service_account_email = google_service_account.sa_pubsub.email
}
}
}

resource "google_storage_notification" "gcs_notification" {
bucket = google_storage_bucket.data_bucket.name
payload_format = "JSON_API_V1"
topic = google_pubsub_topic.gcs_notification_topic.id
event_types = ["OBJECT_FINALIZE", "OBJECT_METADATA_UPDATE"]
depends_on = [google_pubsub_topic_iam_binding.gcs_pubsub_binding]
}

// Enable notifications by giving the correct IAM permission to the unique service account.
data "google_storage_project_service_account" "gcs_account" {
}

resource "google_pubsub_topic_iam_binding" "gcs_pubsub_binding" {
topic = google_pubsub_topic.gcs_notification_topic.id
role = "roles/pubsub.publisher"
members = ["serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}"]
}

Deploying the End-to-End Pipeline

The code snippets above are for demo. Please follow the deployment guide to deploy the end-to-end pipeline to your environment.

Running the Pipeline

Copy a file to the GCS data bucket (alternatively, upload from the console)

gsutil cp gs://pub/shakespeare/rose.txt gs://<project>-data-wordcount/landing/rose_2.txt

Observe the execution of the Cloud Function

Cloud Function Execution

and of the Cloud Workflow

Cloud Workflow Execution

check the BigQuery results of the two steps of the workflow

word count table
word count aggregate table

and finally, the end-to-end solution deploys a monitoring module that keep track of individual files landed on GCS and their processing status. The tracking data (i.e. logs) is exported to BigQuery and made available via view

file processing tracking

Conclusion

In this article we demonstrated how to build an event-driven data pipeline on GCP using cost efficient serverless technologies such as Cloud Functions, Cloud Workflows and Dataproc Serverless (i.e. Serverless Spark). The end-to-end implementation could be found on GitHub ready for re-use.

--

--