How to use Dataflow and Google Cloud Functions to automate processing of uploaded files

Dario Bernardo
10 min readApr 17, 2023

--

Image created by generative AI model

In this article, I will explain (in a simplified manner) how at Recognyte we automate the processing of large amounts of data coming from various data sources leveraging Google Cloud functions and Google DataFlow.

Why

In every organization, being able to timely and effectively process data is extremely important. In the real estate world, for example, it is important to continuously monitor market conditions to make sure that the right actions are taken. Monitoring market conditions means ingesting many different data sources from multiple data providers.

In the early stages of an organization, when the data stack is still not very mature, what usually happens is that a data engineer or data scientist will build a set of ad-hoc scripts that will clean and ingest data for a data provider. Often these scripts end up being launched manually or periodically from a server machine every time a data refresh is needed. This ends up creating a very chaotic environment of many different scripts laying around. These scripts may be running on a single server, creating bottlenecks, as the pipeline is scalable as much as the resources of the server the script is running on. Furthermore, having to actively check for new data and launch pipelines is time consuming and prone to errors.

A much better approach would be to have a pipeline that is triggered automatically every time new data becomes available, and is able to scale seamlessly to any amount of data. Read on to learn how we do it at Recognyte.

The Background

At Recognyte, we receive data from a variety of different sources. Regardless of where it comes from, all data files (whether they are CSV, Parquet or Avro files) are uploaded into a cloud storage location. Before landing in our data lakehouse, the data needs to be cleaned and processed. Basically it goes through an Extract, Transform, Load (ETL) process. This article explains how we automated this process by automatically launching an ETL process as soon as a file lands into a google cloud storage folder with google cloud functions.

In this example,I will explain how to build an infrastructure that will find which neighborhoods in the USA that had an increase in house prices of more than 8% and those that saw a decrease in house value of more than 2%. For the benefit of the reader, I will simulate the data ingestion process with an open source dataset, more precisely the housing data set at Zillow. The dataset provides smoothed, seasonally adjusted, time series prices for all USA neighborhoods. I will also show how to ingest this dataset and produce two files one with details of neighborhoods that saw an increase of more than 8% and the other with a decrease of more than 2%.

Please note that this is just a demonstration of the capability and the size of the dataset may not fully justify the adoption of a big data technology such as Dataflow. However, the idea here is that this approach can scale to any data size.

The Architecture

Here is the overall architecture.

Here is the breakdown of the various steps:

  1. A new CSV/Parquet/Avro is uploaded into a cloud storage bucket.
  2. As soon as the file gets uploaded, a notification is sent to a pubsub topic.
  3. A cloud function is subscribed to that same topic, and it gets executed with the filename of the file that has just been uploaded.
  4. The cloud function launches a DataFlow pipeline.
  5. DataFlow reads the uploaded file.
  6. Execute the relevant transformations and output the file.

Benefits

The above architecture has the following benefits:

  • Automation: the entire pipeline gets executed automatically as soon as a file gets uploaded.
  • Reduced cost and complexity: the entire above architecture is serverless, this means that there is no complexity in maintaining a cluster or servers, and there is also no cost in having those machines running continuously when they are not used. The entire pipeline gets spawned and executed only when a file is uploaded and costs are billed only for the time needed for executing the code (ETL pipeline mainly).
  • Scaling: DataFlow, is a Google Cloud big data product that works on top of Apache Beam. It is a fully managed streaming analytics service that minimizes latency, processing time, and cost through autoscaling and batch processing. Dataflow is automatically able to scale on any data, hence whatever size data is uploaded, we are able to handle it.

The How

In this section, I will show you the code.

I have assumed that you have a Google Cloud platform account and a project already set up with the necessary permissions and API enabled. It will also assume that you have Google ‘gsutil’ installed and set up on your local environment.

Step 1: Let’s create prerequisite resources

Firstly, let’s download the data used in this example:

wget https://files.zillowstatic.com/research/public_csvs/zhvi/Neighborhood_zhvi_uc_sfrcondo_tier_0.33_0.67_sm_sa_month.csv\?t\=1676561028

Now let’s create the landing storage bucket. This is the storage location that will automatically trigger the data cleaning and ingestion once the file gets uploaded (action 1 in figure 1). The command below creates a Google Cloud storage location named “landing_storage”:

gcloud storage buckets create gs://landing_storage

We also need to create the work directory for dataflow where we will store the pipeline and dataflow we will use as staging and temp area.

gcloud storage buckets create gs://dataflow_workspace

Let’s also create the output location where Dataflow will store the final processed files.

gcloud storage buckets create gs://output_location

Let’s then create the pub sub notification system that will get notified every time a file gets uploaded to the “landing_storage” folder. The command below creates a pub sub topic called “landing_topic”

gcloud pubsub topics create landing_topic

The command above will create a topic which can be addressed with the following naming convention projects/MY-PROJECT/topics/landing_topic, where MY-PROJECT is the name of the google cloud project you are using.

Now we need to link the Google Cloud storage bucket with the pub sub notification and make sure that the topic receives a notification every time something gets uploaded. The command below does just this:

gsutil notification create -e OBJECT_FINALIZE -t projects/MY-PROJECT/topics/landing_topic -f json gs://landing_storage

The -e OBJECT_FINALIZE means that a notification will be sent every time an object is created on the bucket. For example, OBJECT_DELETE would notify every time an object gets deleted.

At this point we have the first two boxes of the diagram and the arrow “2”.

Step 2: Create the Dataflow pipeline

The below file is the Apache Beam pipeline definition that will perform the transformation on the data. It takes a file as input and outputs two files specified with the parameter “output_increase” and “output_decrese”.

pipeline.py

import logging
import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

logging.basicConfig()


class MyExamplePipelineOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
type=str,
help='Path of the file to read from',
)
parser.add_value_provider_argument(
'--output_increase',
type=str,
help='Where to write the increased value props',
)
parser.add_value_provider_argument(
'--output_decrease',
type=str,
help='Where to write the decreased value props',
)
parser.add_value_provider_argument(
'--pipeline_name',
type=str,
help='This pipe name',
)


class Split(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter

def process(self, text):
yield text.split(self.delimiter)


class PercentageIncreaseCalculator(beam.DoFn):

def process(self, element):
new_elem = element[0:10]
if element[-2] != "" and element[-1] != "":
new_elem.append((float(element[-1])-float(element[-2]))/float(element[-2]))
yield new_elem


class Prepare(beam.DoFn):
def process(self, element):
yield ",".join([str(e) for e in element])


def execute_pipeline(
options: PipelineOptions,
input_location,
output_location_increase,
output_location_decrease
):

with beam.Pipeline(options=options) as pipeline:
data = pipeline | 'Read File' >> beam.io.ReadFromText(input_location, skip_header_lines=1)
split = data | 'Split row by comma' >> beam.ParDo(Split())
percentages = split | 'Perform Action' >> beam.ParDo(PercentageIncreaseCalculator())

output1 = percentages | 'Filter increased' >> beam.Filter(lambda row: row[-1] > 0.08)
output1 = output1 | 'Prepare to write increased' >> beam.ParDo(Prepare())
output1 | 'Write increased' >> beam.io.WriteToText(output_location_increase, file_name_suffix=".csv")

output2 = percentages | 'Filter decreased' >> beam.Filter(lambda row: row[-1] < -0.02)
output2 = output2 | 'Prepare to write decreased' >> beam.ParDo(Prepare())
output2 | 'Write decreased' >> beam.io.WriteToText(output_location_decrease, file_name_suffix=".csv")


def run():
pipe_options = PipelineOptions().view_as(MyExamplePipelineOptions)
pipe_options.view_as(SetupOptions).save_main_session = True
logging.info(f"Pipeline: {pipe_options.pipeline_name}")
execute_pipeline(
options=pipe_options,
input_location=pipe_options.input,
output_location_increase=pipe_options.output_increase,
output_location_decrease=pipe_options.output_decrease
)
logging.info("FINISHED.")


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

Save this file as `pipeline.py`. In the same folder you will also require in the same folder a setup.py available here: https://github.com/DarioBernardo/cloud_examples/tree/main/dataflow_cloudfunction/dataflow

The pipeline takes four inputs:

  • Input: is the input file that the pipeline will process
  • Output_increase: it is the full path name of the file that will contain all properties that saw an increase in value.
  • Output_decrese: it is the full path name of the file that will contain all properties that saw an increase in value.
  • Pipeline_name: It is the full path name of the pipeline. This is used just for debugging reasons so technically it could be omitted.

Now, to successfully compile the pipeline we need first to install Apache Beam in your local Python environment, you can do this with the following command:

pip install "apache-beam[gcp]==2.42.0"

Now we can compile and upload our pipeline to Google Cloud with the following command:

python pipeline.py --runner DataflowRunner --project datasphere-313410 --staging_location gs://dataflow_workspace/example/staging --temp_location gs://dataflow_workspace/example/temp --template_location gs://dataflow_workspace/example/pipeline_templates/my_pipeline --region europe-west3 --setup_file ./setup.py

— project : Specify your local project you are running this on.

— staging_location and — temp_location : These are the staging and temporary directories used by dataflow.

— template_location : Is the location where the compiled pipeline will be uploaded to. This will be used by the cloud function to launch the dataflow job.

After launching the command, we should see the dataflow compiled pipeline at this location. gs://dataflow_workspace/example/pipeline_templates/my_pipeline.

Step 3: Create the Cloud Function

The cloud function will be triggered every time a file is uploaded to the cloud storage gs://landing_storage. The cloud function code is contained in a single file “main.py”, however for it to work you will also need to have in the same folder the “requirements.txt” file. Below are both files:

main.py

import os
import datetime
from googleapiclient.discovery import build


PROJECT_ID = os.environ['PROJECT_ID']
REGION = os.environ['REGION']
TEMPLATE_LOCATION = os.environ['TEMPLATE_LOCATION'].rstrip("/")
OUTPUT_LOCATION = os.environ['OUTPUT_LOCATION'].rstrip("/")
SUBNETWORK = os.environ.get('SUBNETWORK', f"regions/{REGION}/subnetworks/default")
MACHINE_TYPE = os.environ.get('MACHINE_TYPE', "n1-standard-4")

dataflow = build("dataflow", "v1b3")


def handle_pubsub_message(event, context):
print(
f"INPUT (event_id='{context.event_id}', timestamp='{context.timestamp}')"
)

if 'data' not in event:
print(
f"SKIPPING: no data (event_id='{context.event_id}', timestamp='{context.timestamp}')"
)
return

_try_handle_pubsub_message(event, context)


def _try_handle_pubsub_message(event, context):

input_file = f"gs://{event['attributes']['bucketId']}/{event['attributes']['objectId']}"
print(f"This File was just uploaded: {input_file}")

job_name = f"ingestion_{event['attributes']['bucketId']}_{event['attributes']['objectGeneration']}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"

out_location = OUTPUT_LOCATION if OUTPUT_LOCATION[-1] == "/" else OUTPUT_LOCATION+"/"

job_parameters = {
'pipeline_name': job_name,
'input': input_file,
'output_increase': out_location+"increase/data",
'output_decrease': out_location+"decrease/data",
}
environment_parameters = {
"subnetwork": SUBNETWORK,
"machine_type": MACHINE_TYPE
}

request = dataflow.projects().locations().templates().launch(
projectId=PROJECT_ID,
location=REGION,
gcsPath=TEMPLATE_LOCATION,
body={
'jobName': job_name,
'parameters': job_parameters,
"environment": environment_parameters
}
)
response = request.execute()
job = response['job']

print(
f"PIPELINE LAUNCHED (event_id='{context.event_id}', timestamp='{context.timestamp}', job_name='{job['name']}', job_id='{job['id']}')"
)

Requirements.txt

google-api-core==1.23.0
google-api-python-client==1.7.11
google-apitools==0.5.31

Now to have the whole thing working, we need to deploy the cloud function to the Google Cloud. The following command will do just that:

gcloud functions deploy dataflow-launcher --gen2 --runtime=python39 --region=europe-west3 --entry-point=handle_pubsub_message --trigger-topic=landing_topic --set-env-vars=PROJECT_ID=MY-PROJECT,REGION=europe-west3,TEMPLATE_LOCATION=gs://dataflow_workspace/example/pipeline_templates/my_pipeline,OUTPUT_LOCATION=gs://output_location

— trigger-topic=landing_topic is used to configure the cloud function to listen on the “landing_topic” topic. The topic receives a message every time a new file is uploaded to the gs://landing_storage cloud storage, and the cloud function gets triggered on each received message from the topic. The cloud function processes the message and understands what file has been uploaded and triggers a dataflow job to process that file.

The — set-env-vars flag, sets environment variables that will be available to the cloud function environment. These variables include some parameters for the dataflow job (the output location) but also the location of the dataflow compiled pipeline we created in the previous step (TEMPLATE_LOCATION).

Step 4: Launch it!

We now have all the pieces in place ready to launch the pipeline. To do that we just need to upload the dataset we want to process into the “landing_storage” cloud storage bucket. The command below assumes you have the Zillow dataset in the same folder you are launching the command from.

gsutil cp ./Neighborhood_zhvi_uc_sfrcondo_tier_0.33_0.67_sm_sa_month.csv gs://landing_storage/test_folder/

The above command will upload the starting dataset to “gs://landong_storage”, this will in turn trigger a pubsub notification and that will trigger the cloud function. The cloud function will launch the dataflow and after a few minutes the processed output files will appear in the output location storage bucket gs://output_location.

In your dataflow console you should see your pipeline executed and the various steps.

Conclusion

In this article I have shown you the main components of a system that is automatically executed every time a file is uploaded to your cloud storage location and it uses big data processing to clean the output. This architecture is able to ingest any size of data, as it leverages Apache Beam through DataFlow, which is the next gen big data technology.

The proposed approach requires minimal supervision as everything is executed automatically as soon as new data is uploaded to our cloud. This reduces the time that the data team at Recognyte needs to spend on day to day data ingestion operational tasks and allows us to focus on new value delivery work. There are other benefits related to cost and simplification of the architecture as it eliminates the costs and overheads of running servers, the entire architecture is serverless and Recognyte is billed only for the computation power used while ingesting and for the cloud storage.

To make the entire process even smoother, a CICD infrastructure could be created as a further improvement. This could be easily done by using cloud build triggers to automate the deployment of the cloud function and the compilation of the pipeline every time the code is modified. In this approach for simplicity I have deployed the cloud function and compiled the pipeline using ‘gsutil’ or direct commands, but these can be easily replaced by github actions and cloud build triggers.

The full code of this example is available here.

--

--

Dario Bernardo

Dario has 10 years industry experience in data science, machine learning and data. He works at Recognyte as head of data.