How to build a cleaning pipeline with BigQuery and DataFlow on GCP

pascalwhoop
datamindedbe
Published in
4 min readOct 29, 2019

I have a small script running on my phone which sends a set of information to the cloud every 5 minutes. I wanted to build a dataset with which I could do some analysis and learn about the resources available on GCP. Note that for such a simple use-case, it would probably make most sense to use DataPrep. But I wanted to learn DataFlow and BigQuery, so I am building the cleaning pipeline myself!

What I want

A nice dashboard showing my location over the last several months as well as the charge pattern of my phone and whatever else I may get out of the data

What I got

A dataset on BigQuery that holds a range of values, yet all of them as strings and many with dirty values. I have values such as the ones shown below, generated by sending the mentioned data snapshot to the cloud (via a cloud function, pubsub and piping that into BigQuery).

The schema of the data is currently as shown below.

name              type   mode
================================
battery_status STRING REQUIRED
bluetooth_status STRING REQUIRED
cell_id STRING REQUIRED
cell_strength STRING REQUIRED
gps_status STRING REQUIRED
last_app STRING REQUIRED
location_gps STRING REQUIRED
location_net STRING REQUIRED
location_accuracy STRING REQUIRED
altitude STRING REQUIRED
speed STRING REQUIRED
location_seconds STRING REQUIRED
timestamp STRING REQUIRED

OK, let’s clean that up so we can work with that! While I could do this with SQL, let’s assume we are in an enterprise environment and we want to ensure we have a solid data preparation pipeline defined. The fact that I only have 34000 rows also adds to the fact that a simple python script would probably suffice. But, let’s do as the Germans say and “shoot for sparrows with cannons”.

DataFlow Pipeline

To get set up, I first create a folder in which to keep the source code. Creating a virtualenv and installing apache-beam[gcp] with pip does the trick for preparation. Looking at the example code, I come up with the following pipeline code. Let’s first look at the “boilerplate” part:

There are several things to learn here:

  • The imports from are exclusively from the apache-beam package. It may be called DataFlow by Google but it is a 100% apache project from a code perspective
  • all configuration happens using the PipelineOptions (docs) object in lines 14–21
  • lines 23-end define the BigQuery project/table structure, the target schema of the new table and the source and sinks
  • when defining the schema as json (line 50), be sure to have an object with the key fields containing the array of fields. This goes against the docs but code doesn’t lie

Now let’s look at the actual processing bit

The first few lines describe the high level pipeline. Very simple in our case: read, clean, write. The clean step is performed by a dedicated DoFn , a python object that gets sent to every worker that is spun up. Lines 14–65 describe this unit of code. It took me a while to figure out how to write this without it failing due to foo not found errors in DataFlow. Basically, RTFM applies. The “function object must be serializable”.

All of the code between 21–65 is my domain specific transform code. I basically define a set of functions to apply to each of the columns in a row. As the docs mention, each row is a python dictionary with the column names as the keys.

Testing the code

The above code is easily unit-testable. When searching the docs for testing instructions, it didn’t seem very verbose but the wordcount example mentions logging and testing.

For a simple “beam-like test”, see the gist below. It makes use of the TestPipeline class provided by beam, as well as assert_that and equal_to helpers

Running it in the cloud

Running this is as simple as calling python cleanup_pipeline.py . It Then outputs a bunch of info on the console but you can also check your job in the GCP web interface

INFO:root:Create job: <Job
createTime: '2019-10-20T11:49:03.253389Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2019-10-20_04_49_01-12566155338426585589'
location: 'us-central1'
name: 'phone-sensors-cleanup'
projectId: 'pascalwhoop'
stageStates: []
startTime: '2019-10-20T11:49:03.253389Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
...
INFO:root:2019-10-20T11:52:23.254Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 13 based on the rate of progress in the currently running step(s).
...
INFO:root:2019-10-20T11:58:41.137Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:root:Job 2019-10-20_04_49_01-12566155338426585589 is in state JOB_STATE_DONE
GCP Console view of the job

GCP automatically scaled my job to 14 workers to run my job in parallel which is exactly why I had the ParDo step in the pipeline. During the process it actually dumps all the rows into avro files on GCS and then directs each worker to read a subset of files to process and write to a staging location in GCS which then automatically gets reimported into BigQuery as a new table.

Further reading

--

--

pascalwhoop
datamindedbe

Software Developer, Tech enthusiast, student, board sports and food lover