Building an Event-Driven Serverless Data Pipeline with Google Cloud Workflows and Functions

Samet Karadag
Google Cloud - Community
5 min readOct 13, 2023

In this blog post, we’ll explain how to create an event driven system that loads data from Google Cloud Storage (GCS) to BigQuery and curates it based on where it came from. We’ll use Cloud Workflows triggered by GCS events and Cloud Functions to do this.

Step 1: Create a Cloud Function to load data from GCS to BigQuery

In my previous post I wrote about how to streamline data ingestion with event-driven serverless loads into BigQuery from GCS using Cloud Functions triggered by new uploads.

We will use the same function with some changes to trigger it via Workflows rather than GCS.

Here is the updated cloud function that is triggered via pubsub and workflows (please read my previous post for more details on this function):

import os
import json
from google.cloud import bigquery
from google.cloud import storage
import functions_framework
import time
import datetime
from flask import jsonify

# Set your GCS bucket and BigQuery dataset and table names
BIGQUERY_DATASET_ID = 'ingestion'

def prepare_json_file(file_bucket, file_name):
# Initialize GCS client
storage_client = storage.Client()
temp_file_name = './'+file_bucket+'_'+file_name

# Download the file from GCS to a local temporary file
bucket = storage_client.bucket(file_bucket)
blob = bucket.blob(file_name)
blob.download_to_filename(temp_file_name)

# Read the content of the temporary file
with open(temp_file_name, 'r') as file:
# lines = file.readlines()
json_file = json.load(file)

records = [json.dumps(record) for record in json_file["records"]]

newline_delimited_json = '\n'.join(records)

# Check if the file has at least 2 lines
if len(newline_delimited_json) >= 2:

# # Write the modified content to a new temporary file
with open(temp_file_name, 'w') as file:
file.writelines(newline_delimited_json)

# Upload the modified file to GCS, to the process_json bucket
bucket = storage_client.bucket('preprocess_json')
blob = bucket.blob(file_name)
blob.upload_from_filename(temp_file_name)
print(f"Converted '{file_name}' to newline_delimited_json format in GCS bucket '{file_bucket}'.")

else:
print("The file does not have enough lines to remove the first and last lines.")

# Clean up the local temporary file
os.remove(temp_file_name)
return file_name


def get_table_schema(table_id):
# Initialize BigQuery client
bigquery_client = bigquery.Client()

# Get the schema of the existing table
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(table_id)
table = bigquery_client.get_table(table_ref)

return table.schema

def create_empty_table(table_id, schema):
# Initialize BigQuery client
bigquery_client = bigquery.Client()

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(table_id)

# Define the schema for the new table
table = bigquery.Table(table_ref, schema=schema)

# set table to expire 5 days from now
expiration = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
days=1
)

table.expires = expiration

# Create the empty table in BigQuery
bigquery_client.create_table(table)

print(f"Empty table {table_id} created in BigQuery.")

def load_data_to_bigquery(data):
# Get the file details from the event
file_name = data['objectId']
file_bucket = data['bucketId']

BIGQUERY_TABLE_ID = file_bucket.split('-')[1].upper()
# Create a new table name with a timestamp epoch
epoch_timestamp = str(int(time.time()*1000000))
new_table_name = f"{BIGQUERY_TABLE_ID}_{epoch_timestamp}"

# Initialize BigQuery and Storage clients
bigquery_client = bigquery.Client()
storage_client = storage.Client()

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(BIGQUERY_TABLE_ID)

# Get the schema from an existing table (e.g., ingestion.DP1)
schema = bigquery_client.get_table(table_ref).schema # Change to the desired table

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(new_table_name)

# Create a new empty table with the same schema as the existing table
create_empty_table(new_table_name, schema)

# Define job configuration
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.CSV if file_name.endswith('.csv') else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

# If it is a JSON file remove first and last line to convert it to newline_delimited_json rather than an array
if file_name.endswith('.json'):
file_name = prepare_json_file(file_bucket, file_name)
file_bucket = 'preprocess_json'

# Load data from GCS to BigQuery
uri = f"gs://{file_bucket}/{file_name}"
load_job = bigquery_client.load_table_from_uri(uri, table_ref, job_config=job_config)

load_job.result() # Wait for the job to complete

print(f"File {file_name} loaded into BigQuery table {BIGQUERY_DATASET_ID}.{new_table_name}")
return new_table_name

# Entry point for Cloud Function
# Triggered by pubsub
@functions_framework.http
def main(request):
# data = request.data.message.attributes
print(request)
request_json = request.get_json(silent=True)
request_args = request.args
print(request_json)

if request_json and 'input' in request_json:
data = request_json['input']['data']['message']['attributes']

bucket = data["bucketId"]
name = data["objectId"]
timeCreated = data["eventTime"]
else:
raise ValueError("JSON is invalid, or missing a 'data' property")


print(f"Data: {data}")

print(f"Bucket: {bucket}")
print(f"File: {name}")
print(f"Created: {timeCreated}")

return_msg = load_data_to_bigquery(data)

return jsonify({ 'table_name' : return_msg })

Step 2: Create a Cloud Function to curate the data

Here is a sample Cloud Function which transforms data from the staging table to the curated table based on its type which is embedded in the table_name;

import os
from google.cloud import bigquery
from google.cloud import storage
import functions_framework


# Initialize the BigQuery client
bq_client = bigquery.Client()

@functions_framework.http
def curate(request):
request_json = request.get_json(silent=True)
request_args = request.args
print(request_json)

if request_json and 'input' in request_json:
table_name = request_json['input']['body']['table_name']
else:
raise ValueError("JSON is invalid, or missing a 'table_name' property")



# Define the SQL queries for DP1 and DP2
t1_query = f"""
INSERT INTO curated.common_target (
<column_list>
)
SELECT
<column_list with required mappings>
FROM ingestion.{table_name} T
JOIN `project_id.curated.lookup` c on (t.lookupcol1 = c.C1 or t.lookupcol1 = c.C2)
"""

t2_query = f"""
INSERT INTO curated.common_target (
<column_list>
)
SELECT
<column_list with required mappings>
FROM ingestion.{table_name} t
JOIN `project_id.curated.lookup` c on (t.lookupcol1 = c.C1 or t.lookupcol1 = c.C2)
"""

# Determine which query to execute based on the table name suffix
data_type = table_name.split("_")[0]
if data_type == 'T1':
query = t1_query
elif data_type == 'T2':
query = t2_query
else:
raise ValueError(f"Unsupported table name: {table_name}")

# Execute the SQL query
job_config = bigquery.QueryJobConfig()
query_job = bq_client.query(query, job_config=job_config)

# Wait for the query job to complete
query_job.result()

print(f"Curated data for input data type {data_type}")
return (f"Curated data for input data type {data_type}")

Step 3: Create a workflow to trigger the Cloud Functions

Here is a sample workflow which is triggered by Eventarc pubsub message and runs load and curate functions per file.

1. Goto Workflows page in GCP and Click the “Create Workflow” button.
2. Enter a name for the workflow, such as `gcs_to_bq_load_curate`.
3. Click the ‘Add trigger’ button.
4. Select the ‘Eventarc’ trigger.

Eventarc Trigger

5- Provide YAML configuration on the next page.

Workflow YAML:

main:
params: [event]
steps:
- logStep:
call: sys.log
args:
text: ${event}
severity: INFO
- gcs_to_bq_load:
call: http.post
args:
url: https://us-central1-project-id.cloudfunctions.net/gcs_to_bq
body:
input: ${event}
auth:
type: OIDC
result: gcs_to_bq_result
next: curate_common


- curate_common:
call: http.post
args:
url: https://us-central1-project-id.cloudfunctions.net/curate-to-common
body:
input: ${gcs_to_bq_result}
auth:
type: OIDC
result: curate_result

Step 4: Create GCS Event Triggers

Now we have our Cloud Workflow is ready and listening to pubsub messages. Final piece that we need is to create GCS event trigger to publish a message whenever a file is uploaded.

You can do it in Cloud Shell or your terminal with the following cmd;

#Add notification for the first source bucket
gcloud storage buckets notifications create --event-types=OBJECT_FINALIZE gs://input-bucket1 --topic projects/project-id/topics/gcs_upload_event_trigger

#If you want to listen multiple buckets add notifications for those buckets as well
gcloud storage buckets notifications create --event-types=OBJECT_FINALIZE gs://input-bucket2 --topic projects/project-id/topics/gcs_upload_event_trigger

Now we have the copmplete event-driven serverless data pipeline to automatically load from gcs to bq and curate. Please let me know what you think or if you have any questions.

--

--