Data Versioning for ML in Airflow

Chick-fil-A Team
chick-fil-atech
Published in
5 min readDec 14, 2022

by Mac Macoy

In a recent post, we shared why we chose Apache Airflow to orchestrate Machine Learning pipelines at Chick-fil-A. In this post we’ll dive deeper into how we enhanced Airflow to support data versioning for Machine Learning.

Airflow at its Core

Airflow at its core is a workflow orchestration tool, and it’s amazing at that. There are hundreds of open source integrations that make it easy to build a workflow without much code.

Although a large portion of Airflow users build data pipelines, Airflow doesn’t natively support moving large amounts of data between tasks. It supports XComs, but they’re not meant to transfer datasets. A solution is to pass references to cloud object store paths (e.g. S3) between tasks.

An example:

# 1: Define S3 path
athena_output_location = "s3://my_bucket/source_data/athena_task/"

athena_task = AthenaOperator(
task_id="athena_task",
query="SELECT * FROM locations",
database="enterprise",
# 2: Define where to write query output
output_location=athena_output_location
)
sagemaker_training_task = SageMakerTrainingOperator(
task_id="sagemaker_training_task",
config={
"InputDataConfig": {
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
# 3: Define where to read training input
"S3Uri": athena_output_location,
...
},
...
}
},
...
}
)
athena_task >> sagemaker_training_task

This gets the job done, but we need a few more enhancements for ML pipelines.

Data must be versioned

Engineers and data scientists know the importance of versioning their code with tools like GitHub. When developing models, your data is just as important as your code — if not more important.

Knowing the exact dataset a model was trained on is important for explaining and reproducing your model.

Stakeholders, customers, or governments may ask you to explain why your model made a decision. Knowing your training data’s makeup can help you answer them.

At any point you may need to rerun a training pipeline. For example, imagine that a training job failed over the weekend. You find the issue and want to rerun the training job on Monday. The training data should remain in its state from the weekend. Any updates to the source dataset since the scheduled run should not be used because it could taint the model.

Data Versioning in Airflow

Before we adopted Airflow, some of our data scientists used DVC (Data Version Control) to orchestrate their model pipelines. As the name suggests, DVC supports data versioning.

To enable data versioning in Airflow, we use Airflow Macros to parameterize S3 paths based on the current DAG run and task. Amending the example from earlier…

athena_output_location = "s3://my_bucket/{{ run_id }}/source_data/{{ task.task_id }}/"

This ensures each DAG run has its own S3 path. Users can easily find a specific pipeline run’s data in S3 by looking for its run ID in the S3 path.

Data scientists don’t need to manage S3 paths

Parameterizing S3 paths with Airflow Macros enables data versioning, but we want to abstract away the need for our data scientists to continually generate and pass around paths.

What if you didn’t have to reference S3 paths at all? Instead, you only needed to reference each dataset by a simple, fitting name? That’s why we created Data Conduits.

A Data Conduit is a simple abstraction around a data store such as an S3 path. It has a name and is referenced by its name throughout the DAG.

Here’s an example definition of a Data Conduit. You pass the name and the S3 bucket to store the data in.

menu_items = S3Conduit(
dc_name="menu_items",
bucket="my-bucket"
)

Inside S3Conduit, it creates an S3 URL templated by the DAG run ID:

class S3Conduit:

def __init__(self, dc_name, bucket, url=None):
self.dc_name = dc_name
self.bucket = bucket
if url is None:
self.url = "s3://" + self.bucket + "/{{ run_id }}/" + self.dc_name + "/"
else:
self.url = url

Here, the menu_items data conduit path will be s3://my-bucket/{{ run_id }}/menu_items/.

Data Conduits in Use

We created Chick-fil-A versions of our most used Airflow operators. Each one of them inherits from its open-source version to avoid re-inventing the wheel and to benefit from future open-source improvements.

We added a few enhancements to each of our Airflow operators, and one enhancement is supporting Data Conduits.

Each of our operators can be categorized as a data source, data sink, or data transformer. Each operator can also be categorized as a node in a graph, while Data Conduits are the node edges that connect them.

The CFARedshiftToS3Operator is a data source that unloads data from Redshift to S3.

query_menu_items = CFARedshiftToS3Operator(
query="""
SELECT *
FROM menu.menu_items
WHERE business_date = date '{{ ds }}''
""",
destination=S3Conduit(
dc_name="menu_items",
bucket="my-bucket"
),
...
)

Notice the destination is defined by a S3Conduit. The operator will unload the data to the templated S3 path defined in the S3Conduit.

The CFAProcessingOperator is a data transformer operator. It runs a Python or R script in a Docker container using Sagemaker Processing. It also automatically mounts all upstream Data Conduits to the container. This means data scientists can reference an immutable local file path in their script to read each dataset instead of managing the ever changing S3 file paths.

Here’s an example CFAProcessingOperator:

feature_engineering = CFAProcessingOperator(
script="src/scripts/feature_engineering.py",
image="12345678.dkr.ecr.us-east-1.amazonaws.com/my-image:tag",
outputs=[
S3Conduit(
dc_name="training_set",
bucket="my-bucket"
)
]
)

[query_menu_items, query_customer_orders] >> feature_engineering

Since query_menu_items and query_customer_orders are upstream tasks of feature_engineering, the menu_items and customer_orders Data Conduits will be available to the feature_engineering.py script in the Docker container via a Docker mount.

menu_items_df      = pd.read_parquet("/opt/ml/processing/input/menu_items/")
customer_orders_df = pd.read_parquet("/opt/ml/processing/input/customer_orders/")

The feature engineering output data can be written to the output data mount as well.

training_df.to_parquet("/opt/ml/processing/output/training_set/")

Sagemaker Processing takes care of copying the output data from the local path to the S3 path defined by the output S3Conduit training_set.

Finally, to complete the example, we can write the training_set back to Redshift using the CFAS3ToRedshiftOperator.

training_set_to_redshift = CFAS3ToRedshiftOperator(
dc_input="training_set",
schema="menu",
table="training_set"
...
)

[query_menu_items, query_customer_orders] >> feature_engineering
feature_engineering >> training_set_to_redshift

Summary

Enabling data versioning by default and enabling seamless data movement between single responsibility tasks are important components of an ML pipeline framework.

Because of the features we’ve added, our data scientists and engineers focus less on data versioning and data movement and more on delivering business value!

Would these patterns and enhancements to Airflow help your teams? Or what ways are you enabling data versioning and seamless data movement on your teams? Let us know in the comments!

--

--