ETL with Apache Beam — Load Data from API to BigQuery

Bose Oladipo
Cars45 Data Analytics
4 min readAug 13, 2020

We were recently faced with a challenge at work where we needed to consume customer details and activities from Exponea. The biggest limitation we encountered was with the event data, the API only allowed you consume events for one customer at a time. We had approximately 2 million users and this meant we would have to make 2 million API calls — a number that would only continue to increase.

Our initial approach was to write a python script that would make these API calls and fetch said data into a Postgres instance of CloudSQL. It worked! However, this script (which was deployed on a Compute Engine instance) took approximately 7 days to run. This was unacceptable as it would consume a lot of resources while limiting our refresh frequency. We needed a more scalable solution.

Optimization options that we could have considered included splitting the list of customer ids into smaller applications, and perhaps some multithreading. However, this exact situation is what distributed analytical engines were built for. My top options were Apache Spark and Apache Beam, and as a heavy Spark user I was inclined to go with the latter. However, as any Spark afficionado would know, this would require setting up and managing infrastructure (whether open source or in the cloud). Since my company’s cloud service provider of choice is GCP, and there is a managed service called Dataflow which would run Beam scripts, I opted for Apache Beam.

The Pipeline

Pipeline Architecture

This article is divided into two parts:

  1. Developing the Apache Beam script
  2. Deploying and Scheduling on GCP

Developing the Apache Beam script

There are 3 options for developing in Apache Beam; Java, Python and Go. I used the Python SDK for this development since that is the language I work in primarily.I was initially concerned about the learning curve for beam, but if you’re familiar with Python then it shouldn’t be a steep one.

Beam has configured data sources and sinks, but you can also configure your own sources and/or sinks. For example, there is currently no JDBC sink for the Python SDK. If you choose to go the route of implementing a custom data source, it’s important that you add a Reshuffle() step in your pipeline in order to take advantage of Dataflow’s parallelization capabilities.

Environment Setup

One good thing about Apache Beam scripts is that you can run them anywhere, including on your local PC with no setup required! This is a big deal for a former Spark user like me because it means I can develop, test and debug locally and simply change my runner when I am ready to deploy.

All you need to do is create a virtual environment in Python (best practice) and install the library and you’re good to go.

pip install apache-beam[gcp]

Writing the script

For my use case, I would be using the built-in BigQuery source and sink. I would implement a custom class myself to call the API however. This was pretty straightforward, all I needed to do was inherit the DoFn class in my custom class and define a process() function which will be called by Beam when I pass a PCollection to this class.

All that’s left is to construct the pipeline to read the customer ids from BigQuery, make the API call for each customer and write the events to another BigQuery table

Deploying and Scheduling on GCP

I deployed the above script on Dataflow and set the maximum number of workers to 5. For testing, I simply run the python script in my terminal. To deploy this script as a dataflow template on GCP I used the following command and specified the DataflowRunner.

Scheduling on GCP

My least favorite part of dataflow has to be the scheduling because it requires making use of two other services on GCP. However, it’s a pretty straightforward process using Cloud Functions and scheduling using Cloud Scheduler. These can be done via the GCP console.

I wrote a simple Cloud Function that would take in a template location on Cloud Storage, a job name and a location and would trigger the start of a Dataflow job using the template via the Dataflow API.

Finally, I created a job on Cloud Scheduler to send a message to the PubSub topic which acts as a trigger for the Cloud Function and set it to run periodically.

Conclusion

I was able to decrease runtime to 6 hours using this architecture! The best part of this is I can easily reduce that runtime even further by either increasing the number of workers or specifying a compute engine with more vCPUs to run this job.

I will be writing a follow up article about managing this data on BigQuery in a cost effective way.

Thank you for reading !

--

--