Pydantic and Airflow: Data Validation in Python and the cloud

How to run custom Pydantic models in Apache Airflow to validate data formats and improve data quality.

DataFairy
Towards Data Engineering
6 min readNov 3, 2023

--

What is Pydantic

Pydantic is the most widely used data validation library for Python.

When working with Pydantic, you create models that inherit from the pydantic BaseModel. With these models you define your data structure. When you then load your data it is validated against the model. If validation fails you get an error or you can let the model throw a custom exception.

from pydantic import BaseModel

# Define the model
class User(BaseModel):
id: int
name: str = "John Doe"


# Create and object and validate it immediately
user_jane = User(id='123', name="Jane Doe")
user_john = User(id='124')

print(user_jane)

user_fail = User(name='123')

# Output

id=123 name='Jane Doe'

pydantic.error_wrappers.ValidationError: 1 validation error for User id
field required (type=value_error.missing)

Pydantic has many more features and the latest release (v2) goes even beyond data validation based on custom models. It’s definitely worth checking out other blogs, videos and to dive into the documentation.

Code Examples

While diving into Pydantic I have collected a few examples that also include the new v2. I can’t take credit for all the code but this might help you get started:

datafairy-azure/pydantic: Pydantic examples (github.com)

Pydantic support for Airflow

There are several possibilities to get started with Pydantic in Airflow. I decided to forgo the dependency hell that I expected to find in Managed Airflow on Azure (Python 3.8 only and instances crashing when requirements are removed in the portal). It’s not really good for experimentation. To be save I started out locally using the Astronomer CLI.

I was expecting that I could just add the Pydantic classes I had written and tested locally to my DAGs and all would be good. I guess that was too optimistic. I immediately got error messages telling me that my custom Request class could not be deserialized and that I was missing some kind of configuration.

The confusing thing was that my pipeline could create the Request object when reading in the data but not transform it back to a dict type. Airflow suggested that I add a reference to the class in the airflow.cfg which Astronomer doesn’t have locally…

After a thorough search online and some discussion with chatGPT I was still not getting close enough to the solution. But putting the pieces together I managed to get it running. To replicate my results you will need to put your Pydantic models in a separate Python file and store it in the include folder.

request_model.py

import pydantic
from typing import List

class InputData(pydantic.BaseModel):
columns: List[int]
index: List[int]
data: List[List[int]]

@pydantic.validator("columns")
@classmethod
def columns_valid(cls, field_value) -> None:
"""Validator to check whether columns are valid"""
if len(field_value) != 23:
raise ValueError("Columns should be of length 23")

for x in field_value:
if x not in range(0,23):
raise ValueError("Columns should be in range 0-22")

return field_value

class Request(pydantic.BaseModel):
input_data: InputData

Furthermore add the following line to your .env file:

AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES=['include.request_model.Request','include.request_model.InputData']

Another note: Pydantic seems to have caused issues with Airflow in the past. The open-source Astronomer Airflow instance, which I currently use locally, doesn’t support the latest version of Pydantic yet. You will notice that I have used the old validator decorators in the code above.

You might be able to forgo all of the above issues by using one of the python package: airflow-pydantic-dags · PyPI.

As I have used nested json in my example this didn’t work for me.

The pipeline

In the following I have created a pipeline that takes nested json input, loads it into a custom Pydantic model, cleans and transforms the data. The data is then loaded to blob storage.

This is the full pipeline:

import json
import pendulum as pm
import pydantic
from typing import List

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.decorators import dag, task


class InputData(pydantic.BaseModel):
columns: List[int]
index: List[int]
data: List[List[int]]

@pydantic.validator("columns")
@classmethod
def columns_valid(cls, field_value) -> None:
"""Validator to check whether columns are valid"""
if len(field_value) != 23:
raise ValueError("Columns should be of length 23")

for x in field_value:
if x not in range(0,23):
raise ValueError("Columns should be in range 0-22")

return field_value


class Request(pydantic.BaseModel):
input_data: InputData


def clean_request(request: Request) -> dict:
"""
Cleans the data by removing negative values. Returns a list of cleaned dictionaries.
"""
cleaned_dict = {}
cleaned_dict["columns"] = request.input_data.columns
cleaned_dict["index"] = request.input_data.index
cleaned_dict["data"] = [max(item, 0) for items in request.input_data.data for item in items]
return cleaned_dict

@dag(
schedule="@daily",
start_date=pm.datetime(2023, 1, 1),
catchup=False,
default_args={
"retries": 2,
},
tags=["example"],
)
def etl_pipeline():
"""
### ETL pipeline
"""

@task()
def load_data() -> List:
"""
Loads json requests using the Request model and returns a list of Request objects.
"""

sample_request_1 = '''{
"input_data": {
"columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],
"index": [0, 1],
"data": [
[20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0],
[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]
]
}
}'''

sample_request_2 = '''{
"input_data": {
"columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],
"index": [0, 1],
"data": [
[20005, 7, 7, 6, 29, 7, 7, 4, 4, 3, 3, 3918, 3107, 694, 5, 5, 5, 5, 694, 5, 5, 5, 5],
[5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3]
]
}
}'''

ordered_data = []

for item in [sample_request_1, sample_request_2]:
ordered_dict = json.loads(item)
ordered_data.append(ordered_dict)

requests: List[Request] = [Request(**item) for item in ordered_data]
return requests

@task()
def clean_data(order_data: List[Request]) -> List[dict]:
"""
Cleans the data and returns a list of dictionaries.
"""
cleaned_requests = []

for request in order_data:
cleaned_dict = clean_request(request)
cleaned_requests.append(cleaned_dict)

return cleaned_requests

@task()
def prepare_requests(cleaned_requests: List[dict]) -> List[str]:
"""
Uploads the requests to Azure Blob. Returns a list of request locations.
"""
request_locations = []
blob_connection = WasbHook(wasb_conn_id="connection_id_blob")
for item in cleaned_requests:
blob_connection.load_string(item, 'azureml', f"request_sample_{pm.now().timestamp()}.json")
request_locations.append(f"request_sample_{pm.now().timestamp()}.json")
return request_locations

raw_data = load_data()
cleaned_data = clean_data(raw_data)
prepare_requests(cleaned_data)

etl_pipeline()

The output of the first step returning the Request items:

[
Request(
input_data=InputData(
columns=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22],
index=[0, 1],
data=[[20000, 2, 2, 1, 24, 2, 2, -1, -1, -2, -2, 3913, 3102, 689, 0, 0, 0, 0, 689, 0, 0, 0, 0], [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]])
),
Request(
input_data=InputData(
columns=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22],
index=[0, 1],
data=[[20005, 7, 7, 6, 29, 7, 7, 4, 4, 3, 3, 3918, 3107, 694, 5, 5, 5, 5, 694, 5, 5, 5, 5], [5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3]])
)
]

Pydantic vs Soda-Core

Using Pydantic for data validation by creating custom models feels very intuitive. It reminds me very much of writing Java or C# code where everything is based on models and inheritance.

But Pydantic is not the only way to validate your data. Let’s compare it to soda-core.

Soda-core:

  • various connectors available (PostgreSQL, DuckDB, Snowflake, …)
  • open-source
  • versatile as it works with different schemas
  • requires a different language for different connectors
  • not only Python
  • paid extension

Pydantic:

  • Python based
  • extremely fast with the Rust backend
  • v2 has features beyond data quality
  • open-source
  • package requirements are not too loose
  • Airflow issues
  • models depend on data being in the dict/json structure

Summary

In this article we looked at Pydantic as a data validation library. We created an Airflow pipeline using custom Pydantic models and ran it locally. Finally we looked at Pydantic vs Soda-Core for data quality applications.

Resources:

If you found this article useful, please follow me.

--

--

DataFairy
Towards Data Engineering

Senior Data Engineer, Azure Warrior, PhD in Theoretical Physics, The Netherlands. I write about Data Engineering, Machine Learning and DevOps on Azure.