An ELT pipeline with Airflow and GCP

Andrés Felipe Mesa David.
9 min readJan 13, 2023

--

Overview

As a final project for a Data Engineer course, I have created an ELT pipeline moving data from a PostgreSQL database (running on Digital Ocean), implementing a Delta Lake with GCP services such as Cloud Storage, BigQuery, Dataproc and Data Studio. And to manage all the workflow and schedule the pipeline, we get Airflow running hosted on a Digital Ocean's Droplet (virtual machine) and with Docker.

This article will be divided into four sections:

  1. Talk about the Database.
  2. How to run Airflow with Docker.
  3. Let's talk about the pipeline itself.
  4. Budget

And to give a taste of how the pipeline looks, this is its structure.

ELT pipeline schema
ELT Pipeline Schema

PostgreSQL Database

To create a data pipeline, we need at least one data source. And this data source could be structured or not structured. This time I have selected a structured data source, a relational database.

This database, called dvdrental, is a PostgreSQL sample database provided by PostgreSQL Tutorial (an excellent site to learn about PostgreSQL), which ERM is the following:

ERM DVD rental

The DVD rental database represents the business processes of a DVD rental store. The DVD rental database has many objects, including 15 tables, which we will use to move to several layers in our data lake and BigQuery in a batch process, just as the previous diagram showed.

Running Airflow with Docker on a Virtual Machine

Orchestration and managing workflows could be a real pain in the ass, so a good option to work with this is Airflow. Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows (just what we are looking for, right?).

But getting Airflow running could be tricky; you'll have to get up and running the web server, the scheduler and the airflow database. If only we could use some platform to containerize all our dependencies and, with a couple of commands, get this running without worrying too much about anything else.

Here is where Docker comes in handy. You can follow this tutorial to get Airflow running on your machine, or if you are more fan of videos, follow this tutorial; that's the one I used.

We could have used Composer from GCP, but it's expensive, and we don't have many DAGs to include in Composer.

Consider Composer if you want to implement this in a production environment with more pipelines and jobs to schedule. And if you have more budget, for sure.

By the way, this is how our Airflow UI should look if you follow the steps on the README.md

Airflow UI

Finally: The Pipeline

Finally, after we have our data sources and orchestration platform, it's time to create each of the tasks that will be the pipeline's steps. But first, we'll need to:

  • Create the GCS project
GCP Project
  • Create an account service in the GCP project with enough permissions to Dataproc, Cloud Storage and BigQuery (follow the GCP docs). I just gave it Editor privileges, but this is not a good practice.
Account Service account
  • Create google_cloud_default connection in Airflow in the Admin > Connections panel. Put the JSON from the previous step in the Keyfile JSON field, fill in the project_id field, and then test the connection.
  • Now you need to create the connection to the PostgreSQL database, with the host, port, user and password to connect to the database. Make sure that user just has read privileges. You don't need anything more than that.

Now, let's discuss how the pipeline is defined. And what better way than with our DAG?

DAG Graph

You can see in the DAG that we have seven tasks (1 task and 6 task groups — the ones in blue). Let's talk about some of them briefly:

bash_task

This task generated a unique identifier for each run; this way, it is easier to track the logs.

bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "[Date: $TODAY]" with uuid: $UUID',
env={'TODAY': str(datetime.today().strftime('%Y-%m-%d %H:%M:%S')), 'UUID': uuid_run}
)

move_tables_to_raw_bucket

This TaskGroup loops over all 15 tables in the source PostgreSQL database and copy all its data (check the SQL argument) to GCS and move it to GCS. A bucket we have defined to store raw data. We do this with the PostgresToGCSOperator.

with TaskGroup(group_id='move_tables_to_raw_bucket') as move_tables_to_raw_bucket:
for table in SOURCE_TABLES:
PostgresToGCSOperator(
task_id=f'move_{table}_to_raw_bucket',
postgres_conn_id=POSTGRES_CONNECTION_ID,
sql=f'SELECT * FROM {table};', # TODO: Execute specific query for each table.
bucket=GCS_BUCKET,
filename=f'{table}/{table}.{FILE_FORMAT}',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
Raw data Bucket

create_tables_in_dtlk_raw

After storing our raw data in the bucket defined to keep it, we need to move it to our Data Lake: BigQuery. For this, we'll use GCSToBigQueryOperator. This operator enables us to transfer data from Cloud Storage and create a Table or append rows to a table in BigQuery. That's why we created another TaskGroup to execute this operator.

with TaskGroup(group_id='create_tables_in_dtlk_raw') as create_tables_in_dtlk_raw:
for table in SOURCE_TABLES:
GCSToBigQueryOperator(
task_id=f'load_{table}_to_raw_dtlk',
bucket=f"{GCS_BUCKET}/{table}",
source_objects=['*'], # all objects in that bucket
source_format='CSV',
skip_leading_rows=1, # first row has header, ignore it
# Big query fields
destination_project_dataset_table=f'{DATASET_RAW}.{table}',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
gcp_conn_id=GOOGLE_CONN
)

raw_to_quality_layer

Now, we have the data in our Data Lake, and as we would like to implement a Delta Lake, the following step is to move the data from the raw to the quality layer. But, we need to do a couple of transformations here using PySpark, and to use Apache Spark in GCP, you need to use Dataproc.

But where are the python scripts that use PySpark to transform the data? In the repo (you'll find those files under the pyspark folder), but for Dataproc to use them, they need to be in a Cloud Storage Bucket.

Dataproc bucket

Here is the 0_raw_to_quality.py.

#!/usr/bin/python
from pyspark.sql import SparkSession

bucket = 'spark-final-project-bucket'
DATASET_SOURCE = 'final_dtlk_raw'
DATASET_TARGET = 'final_dtlk_quality'
SOURCE_TABLES = [
'actor',
'address',
'category',
'city',
'country',
'customer',
'film',
'film_actor',
'film_category',
'inventory',
'language',
'payment',
'rental',
'staff',
'store'
]
QUERY_MOVER = {
'actor': 'SELECT * FROM actor',
'address': 'SELECT address_id, address, district, city_id, postal_code, phone, last_update FROM address',
'category': 'SELECT * FROM category',
'city': 'SELECT * FROM city',
'country': 'SELECT * FROM country',
'customer': 'SELECT * FROM customer',
'film': 'SELECT * FROM film',
'film_actor': 'SELECT * FROM film_actor',
'film_category': 'SELECT * FROM film_category',
'inventory': 'SELECT * FROM inventory',
'language': 'SELECT * FROM language',
'payment': 'SELECT * FROM payment',
'rental': 'SELECT * FROM rental',
'staff': 'SELECT * FROM staff',
'store': 'SELECT * FROM store',
}

spark = (
SparkSession
.builder
.master('yarn')
.appName('bigquery_raw_to_quality')
.getOrCreate()
)


spark.conf.set('temporaryGcsBucket', bucket)

for table in SOURCE_TABLES:
print(table)
spark_df = (
spark
.read
.format('bigquery')
.option('table', f'{DATASET_SOURCE}.{table}')
.load()
)

spark_df.createOrReplaceTempView(table)

# actor is a pretty clean table, we could just move it to qty
df_to_quality = spark.sql(QUERY_MOVER[table])

df_to_quality.show()
df_to_quality.printSchema()

# Spark saving
(
df_to_quality.write.format('bigquery')
.option('table', f'{DATASET_TARGET}.{table}')
.mode('overwrite')
.save()
)
print(f"Done with: {table}")

After you have double-checked the logic on the script and uploaded that file to the Dataproc/Spark bucket, we can create our TaskGroup that would be in charge of creating a cluster with two workers (you can change this), submit a PySpark job to that cluster and after those two are done (failed or succeeded) delete the cluster (if you left the cluster running, it would be expensive).

    with TaskGroup(group_id='raw_to_quality_layer') as raw_to_quality_layer:
create_cluster_raw = DataprocCreateClusterOperator(
task_id='create_cluster_raw',
project_id=PROJECT_ID,
cluster_name=CLUSTER_RAW,
num_workers=2,
# we need a bucket to store files from the cluster
storage_bucket=SPARK_BUCKET,
region=REGION,
)

pyspark_job_raw_to_qty = {
'reference': {
'project_id': PROJECT_ID,
'job_id': f"raw_to_qty_{uuid_run}"
},
'placement': {
'cluster_name': CLUSTER_RAW
},
'labels': {
'airflow-version': 'v2-4-1'
},
'pyspark_job': {
# this is given by Google
'jar_file_uris': ['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'],
# Python script that uses PySpark
'main_python_file_uri': 'gs://spark-final-project-bucket/0_raw_to_quality.py'
}
}

tables_from_raw_to_quality = DataprocSubmitJobOperator(
task_id='tables_from_raw_to_quality',
project_id=PROJECT_ID,
region=REGION,
job=pyspark_job_raw_to_qty,
gcp_conn_id=GOOGLE_CONN
)

delete_cluster_raw = DataprocDeleteClusterOperator(
task_id='delete_cluster_raw',
project_id=PROJECT_ID,
cluster_name=CLUSTER_RAW,
region=REGION,
trigger_rule='all_done'
)

create_cluster_raw >> tables_from_raw_to_quality >> delete_cluster_raw

The missing tasks are similar to the last but involve different PySpark jobs, scripts and layers. Finally, it'll create all the tables in the datasets in BigQuery (make sure to create the datasets first.)

After you have all the tasks defined, you need to set the dependencies of those tasks, and you do that in this manner.

# Order all task
(
bash_task >> move_tables_to_raw_bucket >> # noqa: W504
create_tables_in_dtlk_raw >> raw_to_quality_layer >> # noqa: W504
quality_to_access_layer >> access_to_stage_layer >> stage_to_bus_layer
)

Eventually, when you have curated tables and views in the business layer, you can use those as a source in Data Studio to create a dashboard and generate insights for the business stakeholders.

Dashboard

How much did we spend, and how much will we spend?

Here we will measure the money (or credits, in this case) spent in GCS because I assume you already have a budget for the data sources and that you'll run Airflow locally; but if you want to use a real virtual machine as the pros do, use this link to earn 200 USD in credits in Digital Ocean as create your Droplet.

Earn 200USD on Digital ocean: Here

I developed this pipeline in 13 days, and I had to turn it off some days because I couldn't test it or I was struggling with a new feature, connection or Airflow task. So, how much did we spend on the development process? Although this billing report is not too useful, we could get an idea of which services we used and how much we could spend per month.

Nevertheless, we could use the Google Cloud Pricing Calculator to estimate our budget with the current data's volume and create an additional budget when the volume is ten times higher.

Current volume

Here you can see the details of the budget. And no worries if you can't reach that link; in the budgets folder under the repo, you can find the PDF generated.

10x Current Volume

Here you can see the budget details for ten times the current volume; this will require more processing time in the Compute Engines that Dataproc needs, double the workers (from two workers to four), and probably the cluster will be running for doubled the hours (probably more or less, depending on how efficient the scripts are).

Wrapping up

This was a simple but powerful end-to-end ELT pipeline where we could use several GCP services and orchestrate everything with Airflow.

Here is the repo with all code and PySpark scripts

Stay tuned for more.

Follow me for more on YouTube.

--

--