Creating a Template for the Python Cloud Dataflow SDK

James Moore
5 min readAug 13, 2018

--

My experience in creating a template for Google Cloud Dataflow, using python, I admit, was somewhat arduous. I found myself up late at night, running pipeline after pipeline and pulling my hair out as to why it wouldn’t work. Nevertheless, I persevered and now I can present to you my Dataflow Template written in Python.

In a nutshell, the aim of the pipeline was to read a comma delimited CSV file, process and format the information as needed and commit the data to Firebase’s Firestore and BigQuery.

Now the requirements for this pipeline was to be able to provide specific runtime parameters during the template execution, such as:

  • The path to the csv file
  • The name of the source, which we will use as the location to commit to Firestore
  • The table name in BigQuery.

Now, first thing’s first, let’s initialize the SDK, I’m using windows 10 and my preferred shell is Git Bash. The linux style shell just makes more sense to me, it’s just easier to use and I have grown accustomed to it. Downloading the gcloud dependencies will suffice to run this pipeline.

I’m going to assume you have at least gone through the Google Cloud Dataflow Quickstart for Python, and checked out Apache Beam’s programming guide, wordcount examples and even better yet the mobile gaming examples. These give you a lot of tools and understanding when creating a Google Cloud Pipeline with python.

Now, once you have gone through your due diligence, to run the pipeline, I have found the best method is:

Now to set the arguements for templating, you can read all you want about Google Dataflow Templates, but there’s one primary change in the code, the change of your args given to the pipeline. For example, an ordinary pipeline will run args such as:

Now for a templatable pipeline, all you need to do is make a class for your args, change add_arguement to add_value_provideer_argument , and finally make the options accessible in as an object.

We can now access path and source in a runtime context, success!

But the biggest challenge is upon us, how do we read out this csv file in a meaningful way, one which can be manipulated with ease. According to the documentation, python runtime parameters can only be used with file-based IO’s, no problem, a csv file is really only a comma delimited text file, I’ll just read it out as a text file using ReadFromText, then convert it into a dictionary using csv.DictReader, or so I thought. This failed, the dictionary wouldn’t match the values, try this method if you would like, but it is futile, and in the process of searching for an answer, I found a much easier way!

Pablo E. created a very nice package for reading out csv files as dicts; beam_utils. With CsvFileSource, I could read my csv file directly as a dictionary, and with a bit of coercing reading a csv file out as a dictionary from a runtime context was a success. One small change I made to the CsvFileSource function was alter just slightly the way it read out the packages, line 91:

if self.dictionary_output:
res = {header.lower().replace(' ', ''):val for header, val in zip(headers,rec)}

This ensures all field names are imported as lowercase for ease of extracting, and any empty spaces are taken away which could possibly throw errors.

I then realize myread(CsvFileSource(contact_options.path.get()) function would read my CSV file out in streaming format, I needed to commit to Firestore and BigQuery in batches, thus saving a huge number of connections. Therefore whilst formatting the information it was necessary to manually group the information into groups of 10000 and ensure if the last batch was under 10000 it would be sent through. This last point was a little tricky, although I found the answer here, it was necessary to window the last batch of data, as it was in an ambiguous function, don’t forget the square brackets around GlobalWindow()!

As Firestore lacks its own PTransform for Dataflow, it was necessary to create it’s own FlatMap function thus I could utilize the runtime parameters in the execution of the Firestore function. Now that the data is grouped in 10000, I simply just have to put it through a loop and commit every 400. Information being committed to firestore has to be unicode, so just include # -*-coding: utf-8 -*- at the top of the file as a catch all. One other problem I had was passing a latin or special characters, such as “ñ / 汉”, a simpe trick to pass by this problem is to surround the data in the unicode function unicode(<data>, "utf-8") in the RefactorDict PTransform (as seen above), specifying that it was encoded in utf-8 circumvents these special characters and now you can pass any character you want through to firestore.

One important note, my information is coming through as a tuple, inside a list, therefore I am accessing which list I would like with the [x] , then I am accessing the list with [0] , and defining which side of the tuple I want to use, the [0] is used for the document key, and the [1] is the dictionary I am committing to Firestore.

BigQuery does contain it’s own PTransform, although at the moment it is not possible to use runtime parameters within the BigQuery PTransform, and as I wanted to provide the name of the table during template execution, I have to pass it manually through another FlatMap function, and as the batch limits for streaming into BigQuery are 10000, we just have to stream it straight in as the data is already split into chunks of 10000 or less.

Another important note, I am only commiting the keys to BigQuery, which is the first half of the tuple.

It’s time to launch the pipeline!

To create the template, follow these steps in your command line. For this template the --output parameter will not be necessary, although it will be necessary to include --requirements_file requirements.txt , and also --extra_package ./path/to/beam_utils.zip . To learn more about managing pipeline dependencies, visit this page. Once the template is uploaded, follow these steps and either launch the template from the GCP console, or using the REST API, I suggest using the REST API as the GCP console can become tedious. An example of launching a template using a POST request can be found here.

To see the entire code, visit my GitHub Repo at https://github.com/jamesmoore255/dataflow_template/blob/master/README.md

Remember it’s open source so feel free to clone or download it as you please.

Thanks for reading, feel free to get in touch through the comments below, or by e-mail jamesmoore255@gmail.com. This is my first article, so constructive feedback is welcome!

--

--

James Moore

Full stack mobile developer with a passion for exploring new tech