Optimize Your Data Pipeline with Apache Airflow and Great Expectations

Shiv Deshmukh
cisco-fpie
Published in
8 min readMay 16, 2022

If you build or maintain applications that rely on data, there’s a decent chance you’re familiar with both Apache Airflow and Great Expectations.

Apache Airflow is an open source tool that helps DevOps teams build and manage workflows programmatically. It can help drive data pipelines by using standard Python features to define complex workflow requirements as Directed Acyclic Graphs, or DAGs, then execute those workflows automatically.

Great Expectations, which is also open source, is a data validation tool. Engineers can use it to define data testing, documentation and profiling requirements which data must pass before it is moved down the workflow pipeline. It also flags data quality errors so that teams can remedy them before progressing to the next step.

Combining Apache Airflow with Great Expectations data

You can use Airflow and Great Expectations separately. But wouldn’t it be great if you could integrate them in such a way that they work seamlessly in tandem, with Airflow defining data pipeline flows and Great Expectations automatically validating data as it moves according to those flows? Well, you can, and this blog shows you how.

By integrating Airflow and Great Expectations, you not only save time, but you can also validate data quality automatically within a database. You can find data errors faster, reduce the occurrence of data quality issues that may slow down your data pipeline, and minimize the manual effort that your engineers spend managing data quality and movement.

Keep reading for a step-by-step guide to using Airflow and Great Expectations in tandem.

Step 1: Set up Great Expectations

We’ll start by installing and configuring Great Expectations.

We will use simple MySQL DB “world_x” containing four tables country, city, countrylanguage, countryinfo listed here:

https://dev.mysql.com/doc/world-x-setup/en/

Follow the instructions above to set it up on your local host and port 3306

With our MySQL database set up, let’s install Great Expectations. Here’s a Bash script (ge.sh) that will do everything you require.

#!/bin/bash## Database Credentials
world_db_host="localhost"
port="3306"
username="demo_user"
password="demo_pass"
world_db="world_x"
printf "Setting up Great Expectations for UMP...\n\n"printf "Using following Database config:\n"
printf "HOST: %s\n" "$world_db_host"
printf "PORT: %s\n" "$port"
printf "USERNAME: %s\n" "$username"
printf "DATABASE: %s\n" "$world_db"
printf "Remove existing GE dirs, if exists...\n\n"
rm -rf great_expectations
printf "Setting up Python venv...\n\n"
python3 -m venv venv
source ./venv/bin/activate
printf "Installing required GE pip packages...\n\n"
python3 -m pip install --upgrade pip
pip3 install great_expectations
pip3 install pymysql
pip3 install sqlalchemy
pip3 install jupyterlab
pip3 install notebook
printf "\nGE is up with version:\n"
great_expectations --version
printf "Initializing GE and datasource...\n\n"
/usr/bin/expect -c '
spawn great_expectations --v3-api init
expect -re {OK to proceed?} {send "Y\r"}
send "great_expectations --v3-api datasource new\n"
expect -re ":"
send "2\r"
expect -re ":"
send "1\r"
'
ls -l
printf "Setting up MySQL Data Sources...\n\n"
python3 setup_datasources.py world_db_datasource $world_db_host $port $username $password $world_db
printf "Setting up Expectations Suites for world_x DB...\n\n"
python3 setup_expectations.py world_db_city_expectation world_db_datasource city ID,Name,CountryCode,District,Info
python3 setup_expectations.py world_db_country_expectation world_db_datasource country Code,Name,Capital,Code2,Covid_Hotspot
python3 setup_expectations.py world_db_countryinfo_expectation world_db_datasource countryinfo doc,_id,_json_schema
python3 setup_expectations.py world_db_countrylanguage_expectation world_db_datasource countrylanguage CountryCode,Language,IsOfficial,Percentage
printf "Setting up checkpoints for world_x DB...\n\n"
python3 setup_checkpoints.py world_db_city_checkpoint world_db_city_expectation world_db_datasource city
python3 setup_checkpoints.py world_db_country_checkpoint world_db_country_expectation world_db_datasource country
python3 setup_checkpoints.py world_db_countryinfo_checkpoint world_db_countryinfo_expectation world_db_datasource countryinfo
python3 setup_checkpoints.py world_db_countrylanguage_checkpoint world_db_countrylanguage_expectation world_db_datasource countrylanguage
printf "Setting up s3_site locations for GE...\n\n"
sed -i "" "/class_name: DefaultSiteIndexBuilder/r s3_site.txt" great_expectations/great_expectations.yml
sed -i "" "s/site_names: \[\]/site_names: \[\"local_site\"\, \"s3_site\"\]/g" great_expectations/checkpoints/*.yml
printf "Listing all checkpoints...\n\n"
great_expectations checkpoint list

Specifically, this script does the following:

  • Install required packages.
  • Initialize great_expectations.
  • Creates new datasource for MySQL using python script setup_datasources.py
  • and locally hosted “world_x” DB .
  • Creates expectations for each table in the DB using python script setup_expectations.py
  • Create checkpoints for each table in the database using the setup_checkpoints.py script.

You’ll notice that the script references a configuration file called S3_site.txt. This script stores the S3 bucket name where you’ll store Great Expectations data:

s3_site:
class_name: SiteBuilder
store_backend:
class_name: TupleS3StoreBackend
bucket: greatexpectations-s3
site_index_builder:
class_name: DefaultSiteIndexBuilder

Once setup is complete, you should see the following data_docs sites (local and s3) added in your great_expectations.yml file:

data_docs_sites:
local_site:
class_name: SiteBuilder
show_how_to_buttons: true
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/data_docs/local_site/
site_index_builder:
class_name: DefaultSiteIndexBuilder
s3_site:
class_name: SiteBuilder
store_backend:
class_name: TupleS3StoreBackend
bucket: ump-greatexpectations-s3
site_index_builder:
class_name: DefaultSiteIndexBuilder

Note that this ge.sh script calls additional Python scripts to complete installation. One is setup_datasources.py, which looks like this:

import sys
import great_expectations as ge
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists
context = ge.get_context()
print(sys.argv)
datasource_name = sys.argv[1]
host = sys.argv[2]
port = sys.argv[3]
username = sys.argv[4]
password = sys.argv[5]
database = sys.argv[6]
example_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
credentials:
host: {host}
port: '{port}'
username: {username}
password: {password}
database: {database}
drivername: mysql+pymysql
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
name: whole_table"""
print(example_yaml)
context.test_yaml_config(yaml_config=example_yaml)sanitize_yaml_and_save_datasource(context, example_yaml, overwrite_existing=False)
context.list_datasources()

Another is setup_expectations.py:

import sysfrom great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
import great_expectations as ge
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context.types.resource_identifiers import ExpectationSuiteIdentifier
from great_expectations.exceptions import DataContextError
print(sys.argv)
expectation_suite_name = sys.argv[1]
datasource_name = sys.argv[2]
data_asset_name = sys.argv[3]
ignored_columns = sys.argv[4].split(',')
open_data_docs = sys.argv[5] if len(sys.argv) >= 6 else 'false'
context = ge.data_context.DataContext()try:
suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
print(
f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.')
except DataContextError:
suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')
print(context.get_expectation_suite(expectation_suite_name=expectation_suite_name))
context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name)
suite_identifier = ExpectationSuiteIdentifier(expectation_suite_name=expectation_suite_name)
context.build_data_docs(resource_identifiers=[suite_identifier])
batch_request = {'datasource_name': datasource_name, 'data_connector_name': 'default_inferred_data_connector_name',
'data_asset_name': data_asset_name, 'limit': 1000}
validator = context.get_validator(
batch_request=BatchRequest(**batch_request),
expectation_suite_name=expectation_suite_name
)
column_names = [f'"{column_name}"' for column_name in validator.columns()]
print(f"Columns: {', '.join(column_names)}.")
validator.head(n_rows=5, fetch_all=False)
profiler = UserConfigurableProfiler(
profile_dataset=validator,
excluded_expectations=None,
ignored_columns=ignored_columns,
not_null_only=False,
primary_or_compound_key=False,
semantic_types_dict=None,
table_expectations_only=False,
value_set_threshold="MANY",
)
suite = profiler.build_suite()
# Additional Expectations
for x in range(len(ignored_columns)):
validator.expect_column_values_to_not_be_null(column=ignored_columns[x])
print(validator.get_expectation_suite(discard_failed_expectations=False))
validator.save_expectation_suite(discard_failed_expectations=False)
checkpoint_config = {
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-" + expectation_suite_name,
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name
}
]
}
checkpoint = SimpleCheckpoint(
f"_tmp_checkpoint_{expectation_suite_name}",
context,
**checkpoint_config
)
checkpoint_result = checkpoint.run()
context.build_data_docs()validation_result_identifier = checkpoint_result.list_validation_result_identifiers() # [0]
if open_data_docs == 'true':
context.open_data_docs()

Finally, we have setup_checkpoints.py:

from ruamel.yaml import YAML
import great_expectations as ge
from pprint import pprint
import sys
yaml = YAML()
context = ge.get_context()
print(sys.argv)
my_checkpoint_name = sys.argv[1]
expectation_suite_name = sys.argv[2]
datasource_name = sys.argv[3]
data_asset_name = sys.argv[4]
open_data_docs = sys.argv[5] if len(sys.argv) >= 6 else 'false'
yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-{my_checkpoint_name}"
validations:
- batch_request:
datasource_name: {datasource_name}
data_connector_name: default_inferred_data_connector_name
data_asset_name: {data_asset_name}
data_connector_query:
index: -1
expectation_suite_name: {expectation_suite_name}
"""
print(yaml_config)
# Run this cell to print out the names of your Data Sources, Data Connectors and Data Assets
pprint(context.get_available_data_asset_names())
context.list_expectation_suite_names()my_checkpoint = context.test_yaml_config(yaml_config=yaml_config)print(my_checkpoint.get_substituted_config().to_yaml_str())
context.add_checkpoint(**yaml.load(yaml_config))
context.run_checkpoint(checkpoint_name=my_checkpoint_name)if open_data_docs == 'true':
context.open_data_docs()

How to Execute

  • Copy all files (ge.sh, s3_site.txt, setup_datasources.py, setup_expectations.py, setup_checkpoints.py) in the same directory.
  • Execute ge.sh script which will do all the required setup and creates necessary great_expectation’s new Data Context to hold your project configuration.

Great Expectations will create a new directory with the following structure:

great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
|-- config_variables.yml
|-- data_docs
|-- validations

Note: Ensure path for above root directory for great_expectations is set to ENV variable “GE_ROOT_DIR” before proceeding with Airflow setup

Step 2: Set up Apache Airflow

Now that Great Expectations is installed, you can set up Apache Airflow and configure DAGs to integrate Airflow with Great Expectations.

Copy following two files airflowinstall.sh and airflow_greatexpectations.py in a directory

This script(airflowinstall.sh) will do basic setup required for Airflow on your machine

#! /bin/bash# set airflow home as
export AIRFLOW_HOME=$(pwd)/air
export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
printf $AIRFLOW_HOMEprintf "Installing required pip packages...\n\n"
pip3 install apache-airflow
pip3 install great_expectations airflow-provider-great-expectations>=0.1.0
pip3 install pymysql
printf "Moving DAG file to airflow dir...\n\n"
pwd
mkdir -p air/dags
cp airflow_greatexpectations.py air/dags/
ls -l air/dags/
printf "Initialize and Start Airflow...\n\n"
airflow dags list
# initialize the database
airflow db init
# provide your user login credentials and save
airflow users create \
--username demo_user \
--firstname airflowr \
--password demo_pass \
--lastname airflowr\
--role Admin \
--email demo123@gmail.com
# start the web server and scheduler
airflow webserver --port 8080 -D
airflow scheduler

This is a script (airflow_greatexpectations.py) to execute great_expectations checkpoints with Airflow DAGs using GreatExpectationsOperator:

import datetime
import logging
import os
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
# Set ENV variable(GE_ROOT_DIR) path to absolute path of great_expectations directory on your machine
ge_root_dir = os.environ.get('GE_ROOT_DIR')
print("GE_ROOT_DIR:", ge_root_dir)
def on_failure_func(context):
"""Define custom failure notification behavior"""
dag_run = context.get('dag_run')
task_instances = dag_run.get_task_instances()
print("WARNING: These task instances failed which can cause data consistencies", task_instances)
logging.error('WARNING: These task instances failed which can cause data consistencies: {}'.format(task_instances))
with DAG(
dag_id="great_expectations_dag",
start_date=datetime.datetime(2022, 1, 19),
on_failure_callback=on_failure_func,
catchup=False,
schedule_interval=None
) as dag:
ge_world_db_city_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_city_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_city_checkpoint"
)
ge_world_db_country_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_country_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_country_checkpoint",
trigger_rule="all_done"
)
ge_world_db_countryinfo_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_countryinfo_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_countryinfo_checkpoint",
trigger_rule="all_done"
)
ge_world_db_countrylanguage_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_countrylanguage_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_countrylanguage_checkpoint",
trigger_rule="all_done"
)
ge_world_db_city_checkpoint_pass >> ge_world_db_country_checkpoint_pass >> ge_world_db_countryinfo_checkpoint_pass \
>> ge_world_db_countrylanguage_checkpoint_pass

Now, run the airflowinstall.sh script to complete Airflow setup.

Step 3: View Apache Airflow & Great Expectations in action

With all the pieces in place, you can see your creation by opening a browser to http://localhost:8080/.

Notice the Airflow dashboards and Great Expectations DAG. You can trigger the DAG and validate that all tasks have run successfully:

Upon completion of this run, you’ll see all the Great Expectations results in the S3 bucket you defined earlier, and/or localhost based on data_docs_sites entries in great_expectations.yml:

You can also get detailed results on individual expectation suites by clicking on individual rows for specific tables:

Congratulations! Your data pipeline is now built, and you can automatically validate and monitor scheduled workflows.

Conclusion

Integrating Apache Airflow and Great Expectations helps engineers double down on the value of both tools. The ability to automate data validation within an automated data workflow saves time and adds efficiency.

Learn more by checking out the Great Expectations and Apache Airflow documentation.

--

--