An ELT pipeline with Airflow and GCP
Overview
As a final project for a Data Engineer course, I have created an ELT pipeline moving data from a PostgreSQL database (running on Digital Ocean), implementing a Delta Lake with GCP services such as Cloud Storage, BigQuery, Dataproc and Data Studio. And to manage all the workflow and schedule the pipeline, we get Airflow running hosted on a Digital Ocean's Droplet (virtual machine) and with Docker.
This article will be divided into four sections:
- Talk about the Database.
- How to run Airflow with Docker.
- Let's talk about the pipeline itself.
- Budget
And to give a taste of how the pipeline looks, this is its structure.
PostgreSQL Database
To create a data pipeline, we need at least one data source. And this data source could be structured or not structured. This time I have selected a structured data source, a relational database.
This database, called dvdrental
, is a PostgreSQL sample database provided by PostgreSQL Tutorial (an excellent site to learn about PostgreSQL), which ERM is the following:
The DVD rental database represents the business processes of a DVD rental store. The DVD rental database has many objects, including 15 tables, which we will use to move to several layers in our data lake and BigQuery in a batch process, just as the previous diagram showed.
Running Airflow with Docker on a Virtual Machine
Orchestration and managing workflows could be a real pain in the ass, so a good option to work with this is Airflow. Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows (just what we are looking for, right?).
But getting Airflow running could be tricky; you'll have to get up and running the web server, the scheduler and the airflow database. If only we could use some platform to containerize all our dependencies and, with a couple of commands, get this running without worrying too much about anything else.
Here is where Docker comes in handy. You can follow this tutorial to get Airflow running on your machine, or if you are more fan of videos, follow this tutorial; that's the one I used.
We could have used Composer from GCP, but it's expensive, and we don't have many DAGs to include in Composer.
Consider Composer if you want to implement this in a production environment with more pipelines and jobs to schedule. And if you have more budget, for sure.
By the way, this is how our Airflow UI should look if you follow the steps on the README.md
Finally: The Pipeline
Finally, after we have our data sources and orchestration platform, it's time to create each of the tasks that will be the pipeline's steps. But first, we'll need to:
- Create the GCS project
- Create an account service in the GCP project with enough permissions to Dataproc, Cloud Storage and BigQuery (follow the GCP docs). I just gave it Editor privileges, but this is not a good practice.
- Generate a key for that Service Account, and store that JSON file someplace safe (you'll need to add this to Airflow Connections)
- Create
google_cloud_default
connection in Airflow in the Admin > Connections panel. Put the JSON from the previous step in the Keyfile JSON field, fill in theproject_id
field, and then test the connection.
- Now you need to create the connection to the PostgreSQL database, with the host, port, user and password to connect to the database. Make sure that user just has read privileges. You don't need anything more than that.
Now, let's discuss how the pipeline is defined. And what better way than with our DAG?
You can see in the DAG that we have seven tasks (1 task and 6 task groups — the ones in blue). Let's talk about some of them briefly:
bash_task
This task generated a unique identifier for each run; this way, it is easier to track the logs.
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "[Date: $TODAY]" with uuid: $UUID',
env={'TODAY': str(datetime.today().strftime('%Y-%m-%d %H:%M:%S')), 'UUID': uuid_run}
)
move_tables_to_raw_bucket
This TaskGroup loops over all 15 tables in the source PostgreSQL database and copy all its data (check the SQL argument) to GCS and move it to GCS. A bucket we have defined to store raw data. We do this with the PostgresToGCSOperator
.
with TaskGroup(group_id='move_tables_to_raw_bucket') as move_tables_to_raw_bucket:
for table in SOURCE_TABLES:
PostgresToGCSOperator(
task_id=f'move_{table}_to_raw_bucket',
postgres_conn_id=POSTGRES_CONNECTION_ID,
sql=f'SELECT * FROM {table};', # TODO: Execute specific query for each table.
bucket=GCS_BUCKET,
filename=f'{table}/{table}.{FILE_FORMAT}',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
create_tables_in_dtlk_raw
After storing our raw data in the bucket defined to keep it, we need to move it to our Data Lake: BigQuery. For this, we'll use GCSToBigQueryOperator
. This operator enables us to transfer data from Cloud Storage and create a Table or append rows to a table in BigQuery. That's why we created another TaskGroup to execute this operator.
with TaskGroup(group_id='create_tables_in_dtlk_raw') as create_tables_in_dtlk_raw:
for table in SOURCE_TABLES:
GCSToBigQueryOperator(
task_id=f'load_{table}_to_raw_dtlk',
bucket=f"{GCS_BUCKET}/{table}",
source_objects=['*'], # all objects in that bucket
source_format='CSV',
skip_leading_rows=1, # first row has header, ignore it
# Big query fields
destination_project_dataset_table=f'{DATASET_RAW}.{table}',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
gcp_conn_id=GOOGLE_CONN
)
raw_to_quality_layer
Now, we have the data in our Data Lake, and as we would like to implement a Delta Lake, the following step is to move the data from the raw to the quality layer. But, we need to do a couple of transformations here using PySpark, and to use Apache Spark in GCP, you need to use Dataproc.
But where are the python scripts that use PySpark to transform the data? In the repo (you'll find those files under the pyspark
folder), but for Dataproc to use them, they need to be in a Cloud Storage Bucket.
Here is the 0_raw_to_quality.py
.
#!/usr/bin/python
from pyspark.sql import SparkSession
bucket = 'spark-final-project-bucket'
DATASET_SOURCE = 'final_dtlk_raw'
DATASET_TARGET = 'final_dtlk_quality'
SOURCE_TABLES = [
'actor',
'address',
'category',
'city',
'country',
'customer',
'film',
'film_actor',
'film_category',
'inventory',
'language',
'payment',
'rental',
'staff',
'store'
]
QUERY_MOVER = {
'actor': 'SELECT * FROM actor',
'address': 'SELECT address_id, address, district, city_id, postal_code, phone, last_update FROM address',
'category': 'SELECT * FROM category',
'city': 'SELECT * FROM city',
'country': 'SELECT * FROM country',
'customer': 'SELECT * FROM customer',
'film': 'SELECT * FROM film',
'film_actor': 'SELECT * FROM film_actor',
'film_category': 'SELECT * FROM film_category',
'inventory': 'SELECT * FROM inventory',
'language': 'SELECT * FROM language',
'payment': 'SELECT * FROM payment',
'rental': 'SELECT * FROM rental',
'staff': 'SELECT * FROM staff',
'store': 'SELECT * FROM store',
}
spark = (
SparkSession
.builder
.master('yarn')
.appName('bigquery_raw_to_quality')
.getOrCreate()
)
spark.conf.set('temporaryGcsBucket', bucket)
for table in SOURCE_TABLES:
print(table)
spark_df = (
spark
.read
.format('bigquery')
.option('table', f'{DATASET_SOURCE}.{table}')
.load()
)
spark_df.createOrReplaceTempView(table)
# actor is a pretty clean table, we could just move it to qty
df_to_quality = spark.sql(QUERY_MOVER[table])
df_to_quality.show()
df_to_quality.printSchema()
# Spark saving
(
df_to_quality.write.format('bigquery')
.option('table', f'{DATASET_TARGET}.{table}')
.mode('overwrite')
.save()
)
print(f"Done with: {table}")
After you have double-checked the logic on the script and uploaded that file to the Dataproc/Spark bucket, we can create our TaskGroup that would be in charge of creating a cluster with two workers (you can change this), submit a PySpark job to that cluster and after those two are done (failed or succeeded) delete the cluster (if you left the cluster running, it would be expensive).
with TaskGroup(group_id='raw_to_quality_layer') as raw_to_quality_layer:
create_cluster_raw = DataprocCreateClusterOperator(
task_id='create_cluster_raw',
project_id=PROJECT_ID,
cluster_name=CLUSTER_RAW,
num_workers=2,
# we need a bucket to store files from the cluster
storage_bucket=SPARK_BUCKET,
region=REGION,
)
pyspark_job_raw_to_qty = {
'reference': {
'project_id': PROJECT_ID,
'job_id': f"raw_to_qty_{uuid_run}"
},
'placement': {
'cluster_name': CLUSTER_RAW
},
'labels': {
'airflow-version': 'v2-4-1'
},
'pyspark_job': {
# this is given by Google
'jar_file_uris': ['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'],
# Python script that uses PySpark
'main_python_file_uri': 'gs://spark-final-project-bucket/0_raw_to_quality.py'
}
}
tables_from_raw_to_quality = DataprocSubmitJobOperator(
task_id='tables_from_raw_to_quality',
project_id=PROJECT_ID,
region=REGION,
job=pyspark_job_raw_to_qty,
gcp_conn_id=GOOGLE_CONN
)
delete_cluster_raw = DataprocDeleteClusterOperator(
task_id='delete_cluster_raw',
project_id=PROJECT_ID,
cluster_name=CLUSTER_RAW,
region=REGION,
trigger_rule='all_done'
)
create_cluster_raw >> tables_from_raw_to_quality >> delete_cluster_raw
The missing tasks are similar to the last but involve different PySpark jobs, scripts and layers. Finally, it'll create all the tables in the datasets in BigQuery (make sure to create the datasets first.)
After you have all the tasks defined, you need to set the dependencies of those tasks, and you do that in this manner.
# Order all task
(
bash_task >> move_tables_to_raw_bucket >> # noqa: W504
create_tables_in_dtlk_raw >> raw_to_quality_layer >> # noqa: W504
quality_to_access_layer >> access_to_stage_layer >> stage_to_bus_layer
)
Eventually, when you have curated tables and views in the business layer, you can use those as a source in Data Studio to create a dashboard and generate insights for the business stakeholders.
How much did we spend, and how much will we spend?
Here we will measure the money (or credits, in this case) spent in GCS because I assume you already have a budget for the data sources and that you'll run Airflow locally; but if you want to use a real virtual machine as the pros do, use this link to earn 200 USD in credits in Digital Ocean as create your Droplet.
Earn 200USD on Digital ocean: Here
I developed this pipeline in 13 days, and I had to turn it off some days because I couldn't test it or I was struggling with a new feature, connection or Airflow task. So, how much did we spend on the development process? Although this billing report is not too useful, we could get an idea of which services we used and how much we could spend per month.
Nevertheless, we could use the Google Cloud Pricing Calculator to estimate our budget with the current data's volume and create an additional budget when the volume is ten times higher.
Current volume
Here you can see the details of the budget. And no worries if you can't reach that link; in the budgets
folder under the repo, you can find the PDF generated.
10x Current Volume
Here you can see the budget details for ten times the current volume; this will require more processing time in the Compute Engines that Dataproc needs, double the workers (from two workers to four), and probably the cluster will be running for doubled the hours (probably more or less, depending on how efficient the scripts are).
Wrapping up
This was a simple but powerful end-to-end ELT pipeline where we could use several GCP services and orchestrate everything with Airflow.
Here is the repo with all code and PySpark scripts
Stay tuned for more.
Follow me for more on YouTube.