Murat_Aydin
11 min readOct 19, 2023

End-to-End Data Engineering Project — Airflow, Snowflake, DBT, Docker and DockerOperator

Repository

Tech Stack

  • Apache Airflow
  • Snowflake
  • DBT
  • Docker
  • DockerOperator

Overview

In this article, we are going to create an end-to-end data engineering pipeline using airflow, dbt and snowflake and everything will be running in docker. Given the dependency problems, we will have different containers for airflow and its components and the DBT. In order to run Airflow dags on another container, we will be using the DockerOperator.

Our data engineering pipeline will be based on airbnb Istanbul data that is available in the following link:

The data comes with many different fields. However, given the scope of this tutorial, the data is manipulated to be simpler and it is available in the github repo shared above.

What we are going to do is to load the data manually in snowflake in a schema called raw and we will create nine more models on top of them. The models will be run by Dockeroperator that we create in Airflow dag.

Setting Up The Environment

First things first, we need to clone the repository.

git clone https://github.com/Murataydinunimi/Airflow_DBT_Snowflake_Docker.git

We then need to get a snowflake trial account and run the following piece of sql code in a snowflake worksheet. The code is also available in /snowflake_config folder

USE ROLE ACCOUNTADMIN;

-- Create the `transform` role
CREATE ROLE IF NOT EXISTS transform;
GRANT ROLE TRANSFORM TO ROLE ACCOUNTADMIN;

-- Create the default warehouse if necessary
CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH;
GRANT OPERATE ON WAREHOUSE COMPUTE_WH TO ROLE TRANSFORM;

-- Create the `dbt` user and assign to role
CREATE USER IF NOT EXISTS dbt
PASSWORD='dbtPassword123'
LOGIN_NAME='dbt'
MUST_CHANGE_PASSWORD=FALSE
DEFAULT_WAREHOUSE='COMPUTE_WH'
DEFAULT_ROLE='transform'
DEFAULT_NAMESPACE='AIRBNB.RAW'
COMMENT='DBT user used for data transformation';
GRANT ROLE transform to USER dbt;

-- Create our database and schemas
CREATE DATABASE IF NOT EXISTS AIRBNB;
CREATE SCHEMA IF NOT EXISTS AIRBNB.RAW;

-- Set up permissions to role `transform`
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE transform;
GRANT ALL ON DATABASE AIRBNB to ROLE transform;
GRANT ALL ON ALL SCHEMAS IN DATABASE AIRBNB to ROLE transform;
GRANT ALL ON FUTURE SCHEMAS IN DATABASE AIRBNB to ROLE transform;
GRANT ALL ON ALL TABLES IN SCHEMA AIRBNB.RAW to ROLE transform;
GRANT ALL ON FUTURE TABLES IN SCHEMA AIRBNB.RAW to ROLE transform;

What we are doing is simply creating a role, grant appropriate permissions, create the AIRBNB database, the schema and a warehouse. Using the warehouse, database and schema we just created, we need to run the following code to create the raw tables on which we will build our models.

-- Set up the defaults
USE WAREHOUSE COMPUTE_WH;
USE DATABASE airbnb;
USE SCHEMA RAW;

-- Create our three tables and import the data from S3
CREATE OR REPLACE TABLE raw_listings
(id integer,
listing_url string,
name string,
room_type string,
minimum_nights integer,
host_id integer,
price string,
created_at datetime,
updated_at datetime);


CREATE OR REPLACE TABLE raw_reviews
(listing_id integer,
date datetime,
reviewer_name string,
comments string,
sentiment string);



CREATE OR REPLACE TABLE raw_hosts
(id integer,
name string,
is_superhost string,
created_at datetime,
updated_at datetime);

Once the snowflake environment is created, we can upload the data manually. The data is available in the /data_istanbul/processed_data folder.

Click on the tree points next to the table for which you want to load the data. On the upcoming menu, browse the data you want to upload ( for the raw_hosts table it should be final_hosts.csv) and enter the following parameters:

And the loading should be successful.

We now need to configure our profiles.yml file for DBT. The profiles.yml file stores the connection details and credentials required to connect to different database systems or environments when running dbt commands. There can be different profiles like dev and prod. When running dbt we can choose which profile to connect like the following:

dbt run --profile==prod

Our profiles.yml is available at /dbtlearn/profiles.yml and it looks like this

dbtlearn:
outputs:
dev:
account: <account_name>-<regions>
database: AIRBNB
password: dbtPassword123
role: transform
schema: DEV
threads: 1
type: snowflake
user: dbt
warehouse: COMPUTE_WH
target: dev

You can find your own account name and regions in your snowflake http link. Let’s break it down a little bit. Our project is called dbtlearn and we are using the dev profile. The project will be operating on the database AIRBNB with role transform and schema DEV so when we run a dbt command, it will create the DEV schema for us.

Here is the project.yml file. The project.yml file is the project level configuration file where we have project specific configurations that apply to the whole project. We have, for example, the default paths for the models, macros and tests.

Instead, the models section is specific to a given folder where we have models. For example, the project.yml file here says that all the models in the target_layer folder will be materialized as views while the models in the business_layer folder will be tables. We can also specify model level configs which we will see later on. These choices all depend on your own business needs and can be modified.

Docker-Compose and Dockerfile

I came up with many different problems, like dependency problems when I tried to use already available images. That’s why I decided to create my own dbt image for which I created a Dockerfile.

# Use a base image (Debian 10) that supports Python
FROM debian:10

# Update package lists
RUN apt-get update
# Install Python 3 and pip
RUN apt-get install -y python3 python3-pip

# Update pip to the latest version
RUN python3 -m pip install --upgrade pip

# install openssl version 1.1.1
RUN apt-get update && apt-get install -y libssl1.1

WORKDIR /usr/app

COPY requirements.txt ./

# Install Python Requirements
RUN pip install --no-cache-dir -r requirements.txt

# Create alias for dbt-set-profile
RUN echo "alias dbt-set-profile='unset DBT_PROFILES_DIR && export DBT_PROFILES_DIR=\$PWD'" >> .bashrc

CMD ["dbt", "--version"]

As a base image, we get Debian and on top of we build our DBT infrastructure. We use dbt-core == 1.5.0 and as snowflake plugin, we use dbt-snowflake==1.5.0. The requirements file is available in docker/dbt folder. Note that we are installing openssl version 1.1.1 otherwise there is a dependecy problem with the python oscrypto library.

 dbt:
image: custom_dbt_image
container_name: dbt
build: ./docker/dbt/
command:
- bash
- -c
- |
echo "Launching dbt"
#
sleep infinity
restart: unless-stopped
volumes:
- ./dbtlearn:/dbtlearn
ports:
- "8085:8085"
environment:
- TZ=Etc/GMT

While in docker-compose, we keep DBT sleeping infinity for it be always running and answering our commands. Notice that I both use a dockerfile to build it and an image name. That image does not exist, it is there solely to name our dbt image. This will come handy later on.

Data Model

Here is our data model. We have three input tables, hosts, listings and reviews. They contain the basic information of a given airbnb apartment/room. We also have some information regarding the reviews. We will try to get more insight and create a more meaningful representation of the airbnb data.

DATA FLOW OVERVIEW

DBT has lot’s of cool feature and creating a lineage Graph is certainly one of them. When you have many tables referencing each other, it might be pretty difficult to follow or understand the relationship between them. DBT Lineage Graph summarizes it in the coolest way possible. Here we can see where and how the tables or view connect and feed each other.

To summarize the Lineage Graph, after having imported the data in snowflake raw layer, we do small modifications and create the target layer which is aliased as tl. After that we do some joins and some other manipulations and create the business layer where we have our dimension and fact tables. Business layer is aliased by bl and fact table by ft.

We will also create two marts where we will try to understand if full moon effect the reviewer’s opinion on the stay which is created through the dbt seed full_moon_dates and another mart to see if the average review score of the apartments in a given province of İstanbul effects the total number of stays in that province. These marts can be reported in an executive dashboard and I leave that to you.

As you can imagine, the number of ideas that you can come up at this point is limitless given the availability of the data. The things you can do depend on your creativity and business needs. The examples I provide here is simply to give you some basic idea of what we can do.

CREATING DBT MODELS

All the models and seeds are available in the dbtlearn/models and dbtlearn/seeds respectively.

A dbt model is a simple piece of sql code of a view or a table. The following is an example:

This is a model from the business layer and the model is called bl_hosts_cleansed. It is uploaded with the same name in snowflake meaning that once we run it, dbt creates it as a table in snowflake unless you do not specifically specify it as view or ephemeral etc. The models can be run with the following command:

dbt run --project-dir /dbtlearn

Where — project-dir refers to the folder in which we have the project.yml file that I showed above. This command, however, will run all the models. You can also select a specific folder to run only the models available inside that folder.

dbt run --models mart_review_score.* --project-dir /dbtlearn

While the seed is a small data that live inside the seeds folder. When we run the dbt seed command, a new table is created in our data warehouse in our target schema by the name of our file.

dbt seed --project-dir /dbtlearn

This will create all the available seeds in your seeds folder. Again, you can also select specific seeds.

As we said above when showing the project.yml file, we saw that all the models in the business layer folder will be materialized as table and we also mentioned that we can do model specific configurations as seen here. Here with the config keyword, we specify how our fact table should be structured. Since we expect our fact table to be appended with new data regularly, we materialized it as incremental and if the schema changes, the append will fail.

The incremental materialization is put in effect by the built-in dbt function is_incremental() where we say if the review_date is greater than the max value of the review_date already available in the table, then it is a new data. The “this” keyword refers to the model itself, so the bl_ft_reviews.sql model. Sometimes the schema of the incremental table might change or the transformation it goes through might change. Then we might want to refresh the all table. In that case we use:

dbt run --full-refresh

If you are curious about also the other models, please have a look at the dbtlearn/models folder.

AIRFLOW

Let’s now create the containers by running

docker-compose -f docker-compose.yml up -d

Once the containers are created, we can open the webserver of airflow that is running on port 8080:

http://localhost:8080/

where the both admin and password is airflow.

We have six dags to run. We first create the seeds, then the target_layer in which we do small modifications on the tables in raw schema in airbnb database. We then create the business layer where we have the dimension and fact tables. After also creating the marts that we mentioned before, the create_lineage_graph task creates the DBT lineage graph shown above.

The dags are run through the Dockeroperator. As I mentioned before, given the dependency problems between airflow and dbt, I found it easier to run them in separate containers. To run an airflow dag in another container, we can use the DockerOperator. The dags can be seen in docker/dags folder. Here is an example:

run_dbt_task = DockerOperator(
task_id='create_seeds',
image='custom_dbt_image',
api_version='auto',
docker_url='unix://var/run/docker.sock',
command='sh -c "cd /dbtlearn && dbt seed --project-dir /dbtlearn"',
mounts=[Mount(source='<your_directory_where_dbtlearn_is_available>',target='/dbtlearn',type='bind')],
network_mode='container:dbt',
dag=dag
)

We specify the task id, the image name that we defined in the compose file, docker_url which is the URL of the host running the docker daemon. Default is unix://var/run/docker.sock. We specify the command we want to run, the mounts where we have the dbtlearn folder and the network_mode which refers to the dbt container that we create.

If you ever get the above error, you need to give the right permissions to the unix_socket as follows:

sudo chmod 777 /var/run/docker.sock

Now we are ready to trigger the dag manually. In a few minutes, you should see the following output:

Notice that create_lineage_graph task is still running because the DBT started serving the docs at:

http://localhost:8085

As we specified in the docker-compose.yml file.

You can also see it in the logs. Click on the create_lineage_graph task, go to the logs:

Click on the link and you now should be seeing the following:

Here we can see all the models/seeds and many information related to them like their approximate size, number of rows, if they are view or table etc.. Clicking on the green button right-hand corner, we go to the lineage graph:

Now we can stop the dag from the airflow UI manually.

Lastly, we can go to snowflake to verify that our tables and views are ready.

Let’s see one of our marts. Here is the model that calculates the average review score and total stay for each province of Istanbul. The model is in the dbtlearn/models/mart_review_score folder.

{{ config(
materialized = 'table'
) }}


with province_extracted as (select *,
split(listing_title,'·')[0] as province_str,
REGEXP_SUBSTR(province_str, 'IN\\s+(.+)', 1, 1, 'i') AS province_regexp,
upper(REPLACE(split(province_regexp,' ')[1],'""','')) as province
from {{ ref('bl_listings_cleansed') }}
where province != 'ISTANBUL' and province!='İSTANBUL' and province!=''),

review_score_cleaned as (select listing_id,split(listing_title,'·')[1] as review_score_raw,
REPLACE(review_score_raw,'""','') as review_score_no_quotes,
REPLACE(review_score_no_quotes,'★','')as review_score_no_star,
CASE
WHEN TRY_CAST(review_score_no_star AS float) IS NOT NULL THEN TRY_CAST(review_score_no_star AS float)
ELSE NULL
END AS review_score
from {{ ref('bl_listings_cleansed') }}
where review_score is not null),

ctes_joined as (select pe.listing_id, pe.listing_title, pe.room_type, pe.MINIMUM_NIGHTS, pe.price, pe.province,rsc.review_score
from province_extracted pe
join review_score_cleaned rsc on rsc.listing_id = pe.listing_id)

select province, avg(review_score) as average_review_score, count(*) as total_stay
from ctes_joined
group by province
having total_stay > 10
order by average_review_score desc

From the first glance, it looks like there is no an obvious relationship between total stay and average rating score.

But, it was a good try!

CONCLUSION

Congratulations to anyone who made it till here!

We tried to create an end-to-end data engineering pipeline using the rock star technologies of the field. The DBT is cool but making it work is even cooler!

Thanks for reading this post.