Processing arriving GCS files with PubSub triggers

Neil Kolban
Google Cloud - Community
8 min readAug 12, 2024

The article describes experiments in processing Google Cloud Storage (GCS) files through BEAM where the content is to be inserted into BigQuery. We are specifically examining the usage of Cloud Storage event triggers through PubSub.

In our story, we imagine the arrival of new files in GCS. How they arrive in GCS is not described. They may be being written by FTP or as the result of web page uploads. Our goal is to have the content of these structured files inserted into a BigQuery table so that analysis may be performed.

There are many potential solutions to ingesting new data into BigQuery after its arrival as files in GCS. We are going to discuss just one of these solutions. When a new file is created in GCS, GCS is aware that this event has happened. We call it an event because, just line our common notion of an event, there is a time component. At one moment there was no file present and then in a subsequent moment of time there was a file. We say that an event occurred and that event was the creation of the file. Within GCS, we can define event based triggers. By this we mean that when the event is deemed to have occurred, GCS can automatically trigger some external system to make it aware that the event has happened. Rather than the external systems continuously polling GCS and asking “Has anything changed? Have new files arrived?” where it is likely that the majority of answers will be “no” and hence wasted effort, we can have GCS perform an explicit notification. Contrast this with you having to continually check your front door to see if anyone is there as compared to responding to the sound of the doorbell as the result of a button pushed by a visitor.

The notification technology used by GCS to alert us that a new file is present is a publication of a new message to a PubSub topic. An application associated with a subscription on that topic can then be triggered when a new file is created. When we look in detail at the content of the message that is published by GCS we find that it contains (among other items of data), the identity of the file. This is composed of the name of the GCS bucket and the file name within that bucket. However, what is not contained in the PubSub message is the content of the file itself. The message merely alerts us to the existence of the new file and it is our responsibility to read the data from the file to process it. From this, we are now getting the notion that work has to be performed to correctly process the file if our goal is to make the content available for BigQuery analysis.

We haven’t delved deeply into the content of the file. For our purposes, we will assume that it is CSV but other formats could be as easily accommodated. To process a file, we can imagine that we would have to perform logic (pseudo code) that would look like:

Receive the PubSub message that identifies the file
Read the content of the file identified by the message
Parse the CSV content into a sequence of records
For each of the records found in the file:
Insert the record into a BigQuery table

Obviously there are permutations far beyond the above. Some of these might include:

  • What if we are receiving many files of different purposes/formats?
  • What if we are wanting to write to many different BigQuery tables?
  • What if badly formatted files arrive or we can’t write to BigQuery? (error handling)
  • more …

Each of these concepts can be handled by making the eventual solution richer … However, for our discussions, we will limit ourselves to the subset of the puzzle being that all files are of the same format and we want to load just one BigQuery table.

The pseudo code described above could be implemented in a variety of programming languages (Java, Python, Node, C# and more) and could leverage the supplied Google Cloud Platform APIs such as APIs to read GCS files, APIs to insert to BigQuery, libraries to parse CSV and more. We would also have to choose some code hosting technology (Cloud Run, Cloud Functions, Cloud Workflows, Compute Engines) to execute our logic. However, for this article, we are going to assume that we have chosen Apache BEAM as the platform to process our messages. Apache BEAM provides an ETL environment which can horizontally scale to accommodate extremely high volumes. There is no reason to assume that there will only be a trickle of new files. What if hundreds or thousands of new files arrive per second? Apache BEAM should be able to handle it.

Within the Apache BEAM framework there are building blocks called PTransforms that know how to perform pieces of function. We can then aggregate these PTransforms into a pipeline that when executed as a whole, solves our problem. Included in the set of BEAM PTransforms are:

  • Transform to receive new messages from Google Cloud PubSub
  • Transform to execute arbitrary fragments of code
  • Transform to insert new rows into BigQuery tables

BEAM supports a variety of programming languages including Python and Java. When we author a BEAM pipeline, we describe the PTransforms we wish to execute using one of these languages. This then results in a plan being built that is then sent to an Apache BEAM runner (a runtime) for execution. Such a runner is Google Cloud Dataflow hosted in the cloud. We also have a runner that can execute on our local workstation that is called DirectRunner. This can be used during development. A new addition to the BEAM environment is the ability to describe our pipeline as a YAML document. We will also touch on this.

Now we can look at a possible BEAM solution in Python:

import argparse
import logging
import json

import apache_beam as beam
import csv
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
from io import StringIO

def main(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)

# Read the GCS object data described by the PubSub message and parse the content from CSV
# into individual fields.
class ReadGCS(beam.DoFn):
def setup(self):
self.storage_client = storage.Client()

def process(self, gcsPubSubMsg):
jsonData = json.loads(gcsPubSubMsg) # Parse the JSON contained in the PubSub message
bucket_name = jsonData['bucket'] # Access the bucket name from the PubSub JSON message
object_name = jsonData['name'] # Access the object name from the PubSub JSON message

bucket = self.storage_client.get_bucket(bucket_name) # Access the GCS bucket named in the PubSub message
blob = bucket.get_blob(object_name) # Access the object named in the PubSub message that is owned by the bucket
text = blob.download_as_text() # Read the content of the object

csv_dict_reader = csv.DictReader(StringIO(text))
for record in csv_dict_reader:
print(f"record: {record}")
yield record

with beam.Pipeline(options=pipeline_options) as p:
messages = (p |
"PubSub" >> beam.io.ReadFromPubSub(subscription="projects/my-project/subscriptions/gcs-topic-sub") | # Read a message from PubSub
"Read from GCS" >> beam.ParDo(ReadGCS()) | # Read the data from GCS
"Write to BigQuery" >> beam.io.WriteToBigQuery( # Write to a BigQuery table
"my-project.beam.my_table", # The identity of the table into which we will append new rows
schema='name:STRING,salary:INTEGER', # The schema of the table (used if we have to create the table)
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, # Append rows rather than truncate
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER # Don't try and create the table
)
)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main()

Let’s take some of the core aspects of it apart.

The pipeline starts by using the ReadFromPubSub PTransform that is supplied by BEAM. This PTransform returns an unbounded PCollection of messages read from the PubSub subscription whose identity is passed as a configuration parameter.

The next step is that we want to take each of these messages and treat it as a JSON string that contains the identity of the file we wish to read. We then parse this JSON, determine the bucket name and object name and read the file. Knowing that the file is a CSV, we parse the content using a CSV parser and return a new PCollection of records found in the file. While this sounds like a lot of work, we find that we can achieve this in about a dozen lines of code. To describe this in BEAM, we use a BEAM ParDO. A ParDO is a BEAM construct which takes an input PCollection, invokes a Python function for each element in the PCollection and outputs a new PCollection where the output elements are the returns from the Python function. In our logic, the input to the Python function is the JSON PubSub message that contains the identity of the file and the Python function reads the GCS file, parses the CSV content and then outputs a PCollection element for each line in the file. Each line in the CSV file corresponds to a new row to be inserted into the BigQuery table.

This takes us to the final step in the pipeline which is the insertion of a new row into the BigQuery table. Once again we get to take advantage of the existence of a pre-supplied Apache BEAM PTransform called WriteToBigQuery. This PTransform expects an input of a PCollection of records and performs the task of inserting each of those records into the configured table.

And that’s pretty much the core of our story. We have written a Python program that uses the Apache BEAM platform. We can then deploy it to Dataform and, as a result, Dataform will execute it. Since the pipeline is a streaming pipeline (unbounded input from PubSub) it will watch the PubSub subscription for new messages. When a message arrives, this implies a new file is read for processing and the file will be processed.

Having seen our story expressed in Python, let’s also take a look at what it might look like using YAML:

pipeline:
transforms:
- type: ReadFromPubSub
name: pubsub1
config:
subscription: projects/my-project/subscriptions/gcs-topic-sub
format: JSON
schema:
type: object
properties:
name: {type: string}
bucket: {type: string}
- type: PyTransform
name: t2
input: pubsub1
config:
constructor: __callable__
kwargs:
source: |
# With an input of an object that contains the fields 'bucket' and 'name', read the content of the corresponding
# GCS Object which is assumed to be a CSV file. Parse the CSV and, for each record containing within,
# output it to the corresponding PCollection of type Row(name<string>, salary<int>).

def my_ptransform(pcoll):
import csv
from google.cloud import storage
from io import StringIO

class ReadGCS(beam.DoFn):
def setup(self):
self.storage_client = storage.Client()

def process(self, gcsPubSubMsg):
bucket_name = gcsPubSubMsg.bucket
object_name = gcsPubSubMsg.name

bucket = self.storage_client.get_bucket(bucket_name) # Access the GCS bucket named in the PubSub message
blob = bucket.get_blob(object_name) # Access the object named in the PubSub message that is owned by the bucket
text = blob.download_as_text() # Read the content of the object

csv_dict_reader = csv.DictReader(StringIO(text))
for record in csv_dict_reader:
yield record
return pcoll | beam.ParDo(ReadGCS()) | beam.Map(lambda x: beam.Row(name=str(x["name"]), salary=int(x["salary"])))
- type: WriteToBigQuery
input: t2
name: BQ1
config:
table: my-project.beam.my_table
write_disposition: WRITE_APPEND
create_disposition: CREATE_NEVER

options:
streaming: true
runner: Dataflow

What I want you to notice is how similar the YAML logic is to the Python logic. In this instance, YAML is pretty much a 1-for-1 mapping. Since YAML is still in the early phase within Apache BEAM (as of summer 2024), for me, I would continue to build pipelines in Python. The benefits of YAML are likely to come further down the line with graphical pipeline builders such as the Dataflow Job Builder.

References

--

--

Neil Kolban
Google Cloud - Community

IT specialist with 30+ years industry experience. I am also a Google Customer Engineer assisting users to get the most out of Google Cloud Platform.