How to build a cleaning pipeline with BigQuery and DataFlow on GCP
I have a small script running on my phone which sends a set of information to the cloud every 5 minutes. I wanted to build a dataset with which I could do some analysis and learn about the resources available on GCP. Note that for such a simple use-case, it would probably make most sense to use DataPrep. But I wanted to learn DataFlow and BigQuery, so I am building the cleaning pipeline myself!
What I want
A nice dashboard showing my location over the last several months as well as the charge pattern of my phone and whatever else I may get out of the data
What I got
A dataset on BigQuery that holds a range of values, yet all of them as strings and many with dirty values. I have values such as the ones shown below, generated by sending the mentioned data snapshot to the cloud (via a cloud function, pubsub and piping that into BigQuery).
The schema of the data is currently as shown below.
name type mode
================================
battery_status STRING REQUIRED
bluetooth_status STRING REQUIRED
cell_id STRING REQUIRED
cell_strength STRING REQUIRED
gps_status STRING REQUIRED
last_app STRING REQUIRED
location_gps STRING REQUIRED
location_net STRING REQUIRED
location_accuracy STRING REQUIRED
altitude STRING REQUIRED
speed STRING REQUIRED
location_seconds STRING REQUIRED
timestamp STRING REQUIRED
OK, let’s clean that up so we can work with that! While I could do this with SQL, let’s assume we are in an enterprise environment and we want to ensure we have a solid data preparation pipeline defined. The fact that I only have 34000 rows also adds to the fact that a simple python script would probably suffice. But, let’s do as the Germans say and “shoot for sparrows with cannons”.
DataFlow Pipeline
To get set up, I first create a folder in which to keep the source code. Creating a virtualenv and installing apache-beam[gcp]
with pip does the trick for preparation. Looking at the example code, I come up with the following pipeline code. Let’s first look at the “boilerplate” part:
There are several things to learn here:
- The imports from are exclusively from the
apache-beam
package. It may be called DataFlow by Google but it is a 100% apache project from a code perspective - all configuration happens using the
PipelineOptions
(docs) object in lines 14–21 - lines 23-end define the BigQuery project/table structure, the target schema of the new table and the source and sinks
- when defining the schema as json (line 50), be sure to have an object with the key
fields
containing the array of fields. This goes against the docs but code doesn’t lie
Now let’s look at the actual processing bit
The first few lines describe the high level pipeline. Very simple in our case: read, clean, write. The clean step is performed by a dedicated DoFn
, a python object that gets sent to every worker that is spun up. Lines 14–65 describe this unit of code. It took me a while to figure out how to write this without it failing due to foo not found
errors in DataFlow. Basically, RTFM applies. The “function object must be serializable”.
All of the code between 21–65 is my domain specific transform code. I basically define a set of functions to apply to each of the columns in a row. As the docs mention, each row is a python dictionary with the column names as the keys.
Testing the code
The above code is easily unit-testable. When searching the docs for testing instructions, it didn’t seem very verbose but the wordcount example mentions logging and testing.
For a simple “beam-like test”, see the gist below. It makes use of the TestPipeline
class provided by beam, as well as assert_that
and equal_to
helpers
Running it in the cloud
Running this is as simple as calling python cleanup_pipeline.py
. It Then outputs a bunch of info on the console but you can also check your job in the GCP web interface
INFO:root:Create job: <Job
createTime: '2019-10-20T11:49:03.253389Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2019-10-20_04_49_01-12566155338426585589'
location: 'us-central1'
name: 'phone-sensors-cleanup'
projectId: 'pascalwhoop'
stageStates: []
startTime: '2019-10-20T11:49:03.253389Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
...
INFO:root:2019-10-20T11:52:23.254Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 13 based on the rate of progress in the currently running step(s).
...
INFO:root:2019-10-20T11:58:41.137Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:root:Job 2019-10-20_04_49_01-12566155338426585589 is in state JOB_STATE_DONE
GCP automatically scaled my job to 14 workers to run my job in parallel which is exactly why I had the ParDo
step in the pipeline. During the process it actually dumps all the rows into avro files on GCS and then directs each worker to read a subset of files to process and write to a staging location in GCS which then automatically gets reimported into BigQuery as a new table.
Further reading
- Beam programmer guide: https://beam.apache.org/documentation/programming-guide/
- DataFlow programming model docs: https://cloud.google.com/dataflow/docs/concepts/beam-programming-model
- Slow pipeline? Get inspiration from this post or wait for my next post https://medium.com/google-cloud/profiling-dataflow-pipelines-ddbbef07761d