Building A Robust Data Pipeline With Great Expectations, dbt and Airflow

Sasakky
5 min readMay 7, 2022

--

What’s this

Watch the video of the 2020 dbt-related session called Building a robust data pipeline with dbt, Airflow, and Great Expectations, and build a data model while testing with Great Expectations and dbt. I was curious about the idea of building a robust data pipeline by running the pipeline with Airflow, so I actually built one myself and ran it.

System Flow

The data pipeline is outlined below.
1) Validation checks are applied to BigQuery tables in Great Expectations. The validation results are uploaded to GCS for viewing.
(2) If the validation is successful, transform the data by dbt.
If the transformation is successful, execute the test.
(4) Run steps (1) to (3) in Airflow.

Preparation

The Great Expectations and BigQuery surroundings are basically adapted from the previous article.

Dockerfile:

FROM python:3.9-slimRUN apt-get update -y && \
apt-get install --no-install-recommends -y -q \
git libpq-dev python3-dev build-essential && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN pip install --upgrade pip && \
pip install great_expectations && \
pip install sqlalchemy==1.4.25 && \
pip install sqlalchemy_bigquery && \
pip install pybigquery && \
pip install dbt-bigquery && \
pip install apache-airflow[gcp] && \
pip install airflow-dbt
RUN curl -sSL https://sdk.cloud.google.com | bashENV PATH $PATH:/root/google-cloud-sdk/binENV PYTHONIOENCODING=utf-8
ENV LANG C.UTF-8

docker-compose.yml:

version: '3.9'services:
great_expectations:
container_name: great_expectations
build: .
ports:
- "8888:8888"
- "8080:8080"
- "8081:8081"
tty: true
working_dir: /usr/app
environment:
- DBT_PROFILES_DIR=/usr/app/dbt
- AIRFLOW_HOME=/usr/app/dag
volumes:
- ./scr:/usr/app
- gcloud-config:/root/.config
secrets:
- gcp_secret
terraform:
container_name: terraform
entrypoint: ash
image: hashicorp/terraform:latest
working_dir: /tmp/terraform
volumes:
- ./scr/terraform:/tmp/terraform
- gcloud-config:/root/.config
tty: true
secrets:
- gcp_secret
gcloud:
container_name: gcloud
entrypoint: "gcloud"
image: google/cloud-sdk:alpine
volumes:
- gcloud-config:/root/.config
volumes:
gcloud-config:
secrets:
gcp_secret:
file:
{keyfile}

main.tf:

provider "google" {
project = var.gcp_project_id
region = "us-central1"
credentials = "${file("${var.GOOGLE_APPLICATION_CREDENTIALS}")}"
}
resource "google_bigquery_dataset" "bigquery_dataset" {
dataset_id = "sasakky_data_infra_dataset"
friendly_name = "sasakky_data_infra_dataset"
location = "us-central1"
}
resource "google_storage_bucket" "cloud_storage_bucket" {
name = "sasakky_gcs_bucket"
location = "us-central1"
force_destroy = true
website {
main_page_suffix = "index.html"
not_found_page = "404.html"
}
}

variable.tf:

variable "gcp_project_id" {
default = "{project_id}"
}
variable "GOOGLE_APPLICATION_CREDENTIALS" {
default = "/run/secrets/gcp_secret"
}

Directory

The directory structure is as follows.

.
├── Dockerfile
├── docker-compose.yml
└── scr
├── dag(airflowの設定、DAGの配置など)
├── dbt(dbtプロジェクト)
└── great_expectations(Great Expectationsプロジェクト)

Setting up Great Expectations

Validation is applied to the customer table in BigQuery. Basically, the previous article is used, but some modifications have been made.
1) To upload Data Docs to GCS, add the following contents to great_expectations.yml.

data_docs_sites:
gs_site:
class_name: SiteBuilder
store_backend:
class_name: TupleGCSStoreBackend
project: {project_id}
bucket: sasakky_gcs_bucket
site_index_builder:
class_name: DefaultSiteIndexBuilder

(2) Removed unnecessary expectations from Suite, leaving only PK checks and Null checks.

{
"data_asset_type": null,
"expectation_suite_name": "customer_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "customer_id"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_id"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_name"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "gender_cd"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "gender"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "birth_day"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "age"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "postal_cd"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "address"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "application_store_cd"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "application_date"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "status_cd"
},
"meta": {}
}
],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.14.12"
}
}

Setting up dbt

Create a view with men extracted from the customer table.

select
*
from
{{ source('sasakky_data_infra_dataset', 'customer') }}
where
gender_cd = "0" -- gender_cd=0 means men

Register the customer table as a source and write a test to see if transform is correctly executed for customers_men.

Schema.yml

version: 2sources:
- name: sasakky_data_infra_dataset
tables:
- name: customer
models:
- name: customers_men
columns:
- name: customer_id
description: Primary key
tests:
- unique
- not_null
- name: gender_cd
description: sex code(0:men)
tests:
- accepted_values:
values: ['0']
- name: gender
description: sex name
tests:
- accepted_values:
values: ['男性']

Setting up Airflow

Once the Airflow web server is up and running, create a connection.

The DAG file was written as follows: To handle dbt in Airflow, a library called airflow-dbt was used.

dag.py

from datetime import timedelta
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
from airflow import AirflowException
from airflow_dbt.operators.dbt_operator import (
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(0),
'retries': 0,
}
def validate(**context):
from great_expectations.data_context import DataContext
conn = BaseHook.get_connection('sasakky_bigquery')
connection_json = conn.extra_dejson
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = connection_json['extra__google_cloud_platform__key_path']
data_context: DataContext = DataContext(context_root_dir="/usr/app/great_expectations")result = data_context.run_checkpoint(
checkpoint_name="checkpoint_customer",
batch_request=None,
run_name=None,
)
data_context.build_data_docs()if not result["success"]:
raise AirflowException("Validation of the data is not successful ")
with DAG(dag_id='etl', default_args=default_args, schedule_interval='@daily') as dag:
ge_check = PythonOperator(
task_id='validate',
python_callable=validate,
provide_context=True,
)

dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0,
profiles_dir='/usr/app/dbt',
dbt_bin='/usr/local/bin/dbt',
dir='/usr/app/dbt'
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
profiles_dir='/usr/app/dbt',
dbt_bin='/usr/local/bin/dbt',
dir='/usr/app/dbt'
)
ge_check >> dbt_run >> dbt_test

execution

Trigger and execute a DAG in Airflow.

The process was successful to the end, check GCS to see the uploaded Data Docs.

Next, we will check the behavior when the validation fails for Great Expectations.
Set the birth_day column to null and run DAG again.

The dbt task is not executed because the validation failed.
Data Docs shows it as FAILED.

Consideration

We were able to run the data pipeline test with Great Expectations and dbt.
It would be possible to get the latest version of the validation results periodically and detect anomalies in the data while scheduling with Airflow during production.

I think it is possible to run tests on data sources with dbt as well, so I am wondering how to distinguish between the two.
As mentioned in the video, testing of the actual data (in this case, checking for null, primary keys, and whether values are contained within a range, etc.) should be done with Great Expectations, and once that is ensured, testing of the data conversion logic (in this case, checking for the presence of male customers in customers_men) should be done with Great Expectations. Should dbt be used to test the data conversion logic (in this case, whether only male data is correctly extracted from customers_men)?

Since there may be strengths and weaknesses of each tool, we would like to do a more in-depth analysis of the testing on the dbt side.

--

--

Sasakky

Data Engineer, Data Architect and Data Analyst in D2C Startup in Tokyo