Apache Beam, Google Cloud Dataflow and Creating Custom Templates Using Python

Ankita Kundra
The Startup
Published in
6 min readJul 30, 2020

Apache Beam

Apache Beam(Batch + Stream) is a unified programming model that defines and executes both batch and streaming data processing jobs. It provides SDKs for running data pipelines and runners to execute them.

Apache Beam can provide value in use cases that involve data movement from different storage layers, data transformations, and real-time data processing jobs.

There are three fundamental concepts in Apache Beam, namely:

  • Pipeline — encapsulates the entire data processing tasks and represents a directed acyclic graph(DAG) of PCollection and PTransform. It is analogous to Spark Context.
  • PCollection — represents a data set which can be a fixed batch or a stream of data. We can think it of as a Spark RDD.
  • PTransform — a data processing operation that takes one or more PCollections and outputs zero or more PCollections. It can be considered as a Spark transformation/action on RDDs to output the result.

Apache Beam is designed to enable pipelines to be portable across different runners. In the below example, the pipeline is executed locally using the DirectRunner which is great for developing, testing, and debugging.

WordCount Example(“Bigdata Hello World”):

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | 'Creating PCollection' >> beam.Create(['Hello', 'Hello Good Morning', 'GoodBye'])
counts = (
lines
| 'Tokenizing' >> (beam.FlatMap(lambda x: x.split(' '))
)
| 'Pairing With One' >> beam.Map(lambda x: (x, 1))
| 'GroupbyKey And Sum' >> beam.CombinePerKey(sum)
| 'Printing' >> beam.ParDo(lambda x: print(x[0], x[1])))

Let’s have a brief review of what code is doing.

beam.Pipeline(options=PipelineOptions())- creates a beam pipeline by taking in the configuration options.

beam.Create- creates a PCollection from the data.

beam.FlatMap- applies to each element of the PCollection and tokenize each line into words.

beam.Map- transformation operation that maps each word to (word,1)

beam.CombinePerKey- similar to groupbykey operation and sums up each word count.

beam.ParDo- another transformation operation which is applied on key-value pair and prints the final result.

Google Cloud Dataflow

Cloud Dataflow is a fully managed service for running Apache Beam pipelines on Google Cloud Platform. Cloud Dataflow executes data processing jobs. Dataflow is designed to run on a very large dataset, it distributes these processing tasks to several virtual machines in the cluster so that they can process different chunks of data in parallel.
Cloud Dataflow is certainly not the first big data processing engine, its not the only one available on Google Cloud Platform. For example, one alternative is to run Apache Spark in Google Cloud Dataproc Service. So, why would you choose Dataflow?
There are a few reasons :

Serverless: We don’t have to manage computing resources. It automatically spins up and down a cluster of virtual machines while running the processing jobs. We can just focus on building the code instead of building the cluster. Apache Spark, on the other hand, requires more configuration even if it is running on Cloud Dataproc.

Processing code is separate from the execution environment: In 2016, Google donated open-source Dataflow SDK and a set of data connectors to access Google Cloud Platform which added additional features to the Apache Beam project. We can write beam programs and run them on the local system or Cloud Dataflow service. When we look at the Dataflow documentation, it suggests the Apache Beam website for the latest version of the Software Development Kit.

Processing batch and stream mode with the same programming model: Other Big data SDKs require different codes depending on whether data comes in batch or streaming form. On the other hand, Apache Beam addresses it with a unified programming model. Competitors like Spark are considering it but they are not quite there yet.

Creating a Custom template using Python

The primary goal of the templates is to package the dataflow pipelines in the form of reusable components by only changing the required pipeline parameters. The template files are staged on GCS bucket and can be launched either from the console, gcloud command or from other services like Cloud Scheduler/Functions, etc.

Lets us explore an example of transferring data from Google Cloud Storage to Bigquery using Cloud Dataflow Python SDK and then creating a custom template that accepts a runtime parameter.

Google already provides us with many examples of running the Dataflow jobs using python SDK and lots of examples can be found here https://github.com/GoogleCloudPlatform/DataflowTemplates

I am using PyCharm with python 3.7 and I have installed all the required packages to run Apache Beam(2.22.0) in the local.

A CSV file was upload in the GCS bucket. It is the sample of the public dataset “USA Names” hosted on Bigquery and contains all names from Social Security card applications for births that occurred in the United States after 1879.

The application code needs to be authenticated by setting the environment variable GOOGLE_APPLICATION_CREDENTIALin the PyCharm which can be set up as below.

The downloaded key file path should be set up as an env variable in PyCharm.

The service account was given the admin IAM role. If you don’t want an owner role to your service account, please refer to this page (https://cloud.google.com/dataflow/docs/concepts/access-control) on access controls guide for dataflow.

The below code is used to transfer the data from GCS bucket to Bigquery and can be run either as a template or in the local environment by removing the template location option. It is reading the file in GCS location using beam.io.ReadFromText , mapping the element to convert it into Bigquery rows and then writing it to Bigquery using beam.io.BigquerySink .

p = beam.Pipeline(options=pipeline_options_val)
data_ingestion = dataingestion()
(p | 'Read from a File' >> beam.io.ReadFromText(pipeline_options['input'], skip_header_lines=1)
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) |
'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
pipeline_options['output'],
schema='state:STRING,gender:STRING,year:STRING,name:STRING,'
'number:STRING,created_date:STRING'
,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

We will now be creating a custom template from the Bigquery data to count the number of females and males present in the sample data. We will be passing the parameter ‘F’ and ‘M’ from the dataflow runtime parameter. The dataflow template uses runtime parameters to accept values that are only available during pipeline execution. To customize the execution of a templated pipeline, we can pass these parameters to functions that run within the pipeline (such as a DoFn).

We are selecting the gender column from the Bigquery using beam.io.Read(beam.io.BigquerySource()) . Beam.ParDo is used to filter the elements on the value which will be passed during runtime using the dataflow add parameters option. The remaining PCollection is then combined globally using beam.combiners.Count.Globally() to get the count.

A template file is created in the GCS location, the location was passed as a command-line parameter.

Now’s it time to run the pipeline.

Click on the CREATE JOB FROM TEMPLATE from the dataflow UI and type in the required details and pass the runtime parameter as shown in the below image. I am ignoring the warning below which requires a metadata file to be created for validating the parameters. The metadata file can be created in the same folder as the template with the name <template_name>_metadata.

passing the runtime parameter

We can also run the dataflow job using the gcloud command.

gcloud dataflow jobs run <job-name> --gcs-location <gsc-template-location> --region <region> --staging-location <temp-gcs-location>  --parameters filter_val=<filter-value>

Summary

Apache Beam, in combination with Cloud Dataflow, lets us concentrate on the logical composition of pipelines rather than the physical orchestration of parallel processing. It provides useful abstractions that insulate us from low-level details of distributed processing, thereby providing exceptional opportunities for enterprises to boost their productivity.

That’s all for today. You can see the entire code in my Github: https://github.com/ankitakundra/GoogleCloudDataflow

Thanks for reading. Feel free to reach out to me through the comment section or through LinkedIn https://www.linkedin.com/in/ankita-kundra-77024899/. I am open to any discussion and constructive criticism is welcomed. Thank you!!

--

--