Set up an ETL Data Pipeline and Workflow Using Python & Google Cloud Platform (COVID-19 Dashboard)

- perspective and journey of a beginner coder

Ryder Nguyen
The Startup
8 min readAug 26, 2020

--

Recommended Prerequisites: Understanding of Python (pandas & working with dataframes), Jupyter Notebook, SQL table operations and GitHub

Codes can be found here.

Next part (ARIMA model) is now live.

For better viewing, please open on a new tab or go full screen.

Having learned & used Python for about a year, I am no expert when it comes to data pipeline and cloud platform in general. This guide is my personal journey on learning new techniques and some things to keep in mind when developing a data solution.

  1. WHY CLOUD PLATFORM?
  2. BIG DATA / BIG LEARNING CURVE
  3. SETUP YOUR CLOUD PROJECT
  4. JUPYTER NOTEBOOK SETUP
  5. BIGQUERY SETUP
  6. SET UP WORKFLOW TEMPLATE TO BE RUN ON DATAPROC
  7. VISUALIZE ON GOOGLE DATA STUDIO
  8. LESSONS LEARNED
  9. NEXT STEPS

WHY CLOUD PLATFORM?

My unfamiliarity with a cloud platform like Google Cloud Platform (GCP) and Amazon Web Services (AWS) didn’t deter me from learning about them though I know it would be challenging at first. In a nutshell, cloud computing utilizes virtual machines (VM) to process jobs so you’re not confined by the computing power of your own local machine. Imagine running a python script that would take 2 hours to complete and you can’t use your PC to run other jobs at that time. Similar to multiprocessing, processing in the cloud would be split between VMs in a cluster to cut down processing time while increasing computing power. GCP also provides workflow templates that could be scheduled (eg. daily/weekly update) that would “spin up” a managed cluster to process task(s). In GCP, managed clusters are automatically ‘spinned down” after job completion or failures.

BIG DATA / BIG LEARNING CURVE

If you haven’t had any experience with big data, this tutorial would shine some light on that. GCP provides many tools that work in tandem to help you come up with your data solutions. The chart below describe the overall workflow:

SETUP YOUR CLOUD PROJECT

Possibly the most challenging part of this project is to understand how everything works together and what’s the best way to link the services and resources in a way that is efficient. Let’s start with Google Cloud Authentication. If you’re a new customer, sign up for the Free Tier offer by Google and set up using that email. After that:

  • Create a project (covid-jul25) and specify a region (us- west3-a) where your code will live. Interacting with the Cloud Console through CLI will require those information so keep them handy.
  • Create a bucket inside the project that will be used for deployment & take note of this bucket
  • Make sure to enable APIs for the following services: BigQuery, Storage and DataProc
  • Service accounts should be set up for these services: BigQuery, Storage and Compute Engine (Compute Engine should already be set up by default)
  • For LOCAL DEVELOPMENT ONLY, download the API json keys for BigQuery & Storage and store them in the same folder as your Jupyter notebook

NOTES FOR API KEYS: These are very important so please don’t post them anywhere. If you’re working in a public repository, add these json file names to .gitignore before committing.

JUPYTER NOTEBOOK SETUP

To authenticate using the downloaded json API keys and set the environment in the Jupyter notebook, use the following:

#Set credentials for bigquery LOCAL ONLY
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="{API-KEY-NAME}.json"
bigquery_client = bigquery.Client() # Instantiates a client

#Set credentials for cloud storage
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="{API-KEY-NAME}.json"
storage_client = storage.Client() # Instantiates a client

After that, your API keys should be authenticated and ready to go. A rundown of a couple of functions you could use with the APIs that you set up:

#Write to a BigQuery table
pd.to_gbq('table_name',if_exists='param')
#Read from a BigQuery table using legacy syntax
pd.read_gbq(sql, dialect='legacy')
#Run queries on BigQuery directly from Jupyter
query_job = bigquery_client.query("""[SQL CODE]""")
results = query_job.result()

Some tables are more efficient to create in BigQuery while others are easier to transform using Python so you can pick your poison if you’re comfortable with one versus the other. With the BigQuery API however, extract, transform and load become easier. Personally, I use Python for row operations (transpose, string replace, adding calculated columns etc.) while SQL for joining/create/update tables.

In the daily_update script, you will notice that there are SQL codes to DROP &CREATE TABLE in the same code block with DELETE FROM/INSERT INTO statements, this is because some of the tables’ schema needs to be predetermined before they can be imported into Data Studio for visualizing. There are also temp tables that hold data that will then be used to update ‘static’ tables that are linked directly to Data Studio. The idea is that you don’t want to delete tables directly linked to certain visuals on the dashboard.

In the IMPORT & SETUP section of the daily_update notebook, toggle the deployment to ‘local’ or ‘cloud’ :

deployment = 'local' #local or cloudif deployment == 'cloud':
from pyspark.sql import SparkSession #ONlY FOR CLOUD DEPLOYMENT
#Start spark session
spark = SparkSession \
.builder \
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.17.0")\
.master('yarn') \
.appName('spark-bigquery-ryder') \
.getOrCreate()

#Instantiate BigQuery client
bigquery_client = bigquery.Client() # Instantiates a client
#Instantiate Storage client
storage_client = storage.Client() # Instantiates a client

else:
#Set credentials for bigquery !FOR LOCAL ONLY, DON'T COPY TO PYSPARK
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="covid-jul25-**************.json"
bigquery_client = bigquery.Client() # Instantiates a client
#Set credentials for cloud storage
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="covid-jul25-**************.json"
storage_client = storage.Client() # Instantiates a client

Set another code block to set up your cloud working environment (note to change zone and name of bucket accordingly)

#Set working environment
PROJECT_ID='covid-jul25'
REGION='us-west3'
ZONE='us-west3-a'
BUCKET_LINK='gs://us-west3-{BUCKET_NAME}'
BUCKET='us-west3-{BUCKET_NAME}'

BIGQUERY SETUP

To run the script, datasets need to be set up in BigQuery. It’s important for the purpose of this project to set the Data location to US since we’ll be joining data from BigQuery Public Data that lives in the US location. ALL tables that don’t yet exist in the dataset need to be created. For example, the script below will update the rt_results table in the usprojections dataset:

query_job = bigquery_client.query(
"""
DELETE FROM `covid-jul25.usprojections.rt_results` WHERE True;
INSERT INTO `covid-jul25.usprojections.rt_results`
SELECT * except(date), cast(date as date) as date FROM `covid-jul25.usprojections.temp_rt`;
""")
results = query_job.result()

To create the rt_results table, first run the below code then proceed block by block before running on the cloud.

query_job = bigquery_client.query(
"""
DROP TABLE IF EXISTS `covid-jul25.usprojections.rt_results`;
CREATE TABLE `covid-jul25.usprojections.rt_results` AS
SELECT * except(date), cast(date as date) as date FROM `covid-jul25.usprojections.temp_rt`;
""")
results = query_job.result()

SET UP WORKFLOW TEMPLATE TO BE RUN ON DATAPROC

After successful local testing, open Cloud Shell on your Cloud Console and set the below specs for cloud deployment:

* Set working environment (replace the bucket_link & bucket with your bucket name)

export PROJECT_ID='covid-jul25'
gcloud config set project $PROJECT_ID
export REGION=us-west3
export ZONE=us-west3-a
export BUCKET_LINK=gs://us-west3-{BUCKET_NAME}
export BUCKET=us-west3-covid-{BUCKET_NAME}

* Create workflow template

export TEMPLATE_ID=daily_update_template
export cluster_name=covid-cluster
gcloud dataproc workflow-templates create \
$TEMPLATE_ID --region $REGION

* Set the managed cluster attached to the template

gcloud dataproc workflow-templates set-managed-cluster \$TEMPLATE_ID \--region $REGION \--zone $ZONE \--cluster-name $cluster_name \--optional-components=ANACONDA \--master-machine-type n1-standard-4 \--master-boot-disk-size 20 \--worker-machine-type n1-standard-4 \--worker-boot-disk-size 20 \--num-workers 2 \--image-version 1.4 \--metadata='PIP_PACKAGES=pandas google.cloud pandas-gbq' \--initialization-actions gs://us-west3-{BUCKET_NAME}/pip-install.sh

Note: Optional-components includes Anaconda because when you spin up the cluster, the environment doesn’t have it by default. Moreover, initialization-actions are scripts to be run when the cluster spins up. In this case, we need pip to be installed on our cluster. Next, metadata includes some extra sauce like pandas, google.cloud, and pandas-gbq. For initialization-actions, copy the pip-install.sh file from the linked repo to your working bucket or use gsutil cp on Google Shell. Masters and workers specs can be modified according to your needs.

*Add task(s) to the workflow template

export STEP_ID=daily_updategcloud dataproc workflow-templates add-job pyspark \$BUCKET_LINK/daily_update.py \--step-id $STEP_ID \--workflow-template $TEMPLATE_ID \--region $REGION

Step ID names the step within the workflow and file daily_update.py is store in the BUCKET_LINK folder specified above.

*Run and time the workflow template

time gcloud dataproc workflow-templates instantiate \$TEMPLATE_ID --region $REGION #--async

*Update on August 28th, 2020:

There will be times when Google Cloud Platform is down. Follow all the steps outlined above but for a new region/zone (note that you would have to create a new bucket in the new region/zone!). For example, use region us-west2 instead of us-west3 & save the same working script in the new location then run the daily template per usual from the new folder. The quickest way to move files is to use the gsutil cp command:

gsutil cp [old directory] [new directory]

VISUALIZE ON GOOGLE DATA STUDIO

With tables created in BigQuery, we are ready to beautify the data using the connections provided by Google Data Studio. As you can see, there are many ways to connect to different sources from Data Studio. To correctly display data on the dashboard, table schema and structures are important for data blending.

As shown on the left, in order to blend data, you need a common field between different tables (in this instance, region code like ‘US-CA’ for California). Note that this blend will also act like a left outer join so be careful when you aggregate data to display on the dashboard (sum vs. avg)

LESSONS LEARNED

  • Try your best to see what kind of data are out there but don’t get hung up on trying to incorporate all of them
  • Process optimization comes with experience so don’t sweat it if later you find out what used to take half an hour can now take 5 minutes
  • Data visualization should be user-friendly and so your back-end data and tables should be revised based on user’s feedback and the interface should be self-explainable
  • Large amount of data can increase loading time (page 2 of the report) so optimization needs to be done
  • Table structures and schema are important for blending data and need to be designed before incorporating into the workflow (with a lot of deleting and recreate tables in the process)

NEXT STEPS

  • With the understanding of the ETL pipeline, optimize and continue to optimize
  • Incorporate data specific for states such as government measures and business reopening
  • Look into other models other than ARIMA while evaluating strengths and weaknesses
  • Build an ML model that incorporates all data relevant to research

--

--

Ryder Nguyen
The Startup

Data Analyst | A self-identified creative mind trapped inside a mathematical brain | https://rydernguyen.github.io/Portfolio/