Writing data product pipelines with Airflow

A framework to make your data organization transition from simple DAGs to trustworthy data products.

Ronald Ángel
Miro Engineering
9 min readJan 26, 2023

--

Photo by Gareth David on Unsplash

At Miro, the goal is to deliver reliable data products to enable efficient, high trust data decision making. A data product is any data asset built to be consumed. It’s designed to create value for consumers or to allow them to create value from data insights.

Thus, a product must meet the following requirements:

  • Trustworthy: end users must be able to depend on it.
  • Production grade: must follow a clear review and promotion process.
  • Self-describing: using the product should not require hand-holding.
  • Discoverable: end users should be able to find it / know it exists.
  • Addressable: it must have an independent namespace.
  • Interoperable: it must be able to be used with other products.
  • Secure: data must be protected in the product (least access rules).
  • Self-contained/complete: given the expected inputs, it should be able to stand alone.

After setting this goal, we started a process to assess whether the current pipelines met this definition, and to design a framework to let us create complete data products, instead of the simple data transfer processes that had been built before. The result is the concept of product-ready pipelines.

The product-ready pipeline framework

We make a distinction between legacy pipelines (which move data from A to B) and product-ready pipelines based on the ability for data producers to deliver value to data consumers. This value is measured following these six principles:

👷 Accountability: there is a clear owner team, responsible for ensuring product delivery.

🎯 Impact: fulfills a business goal, is discoverable and there are identified data consumers using it.

🔸 Single Purpose: complies with a single business goal and has a fine granular level, making producer-consumer collaboration and governance easier.

Clear Expectations: contains an agreement document between producers and consumers, which includes:

  • Schema and versioning management.
  • Service Level Agreements based on three main data reliability components:

Product pipeline freshness: data arrives on time, as determined based on the needs of the consumer. During a given period, delays cannot exceed the agreed error budget.

Completeness: data volume is expected, compared to the sources.

Correctness: data is internally correct, based on the consumers’ and business expectations of schema, distribution, and lineage.

🔔 Observability: any error has a definition of priority based on incident policies. Moreover, notifications with complete information are sent to the producers through communication channels, dashboards, and the incident management system.

✔️ Tested: pipelines are tested using e2e data ops practices, including:

  • Clear unit test and integration test strategies.
  • Coding standards are deployed and enforced through a CI/CD process.
  • There are clear reviews and delivery processes.

If any of the previous principles are missing, consider the value of keeping any pipeline running on schedule.

What does an example pipeline look like?

metadata
{
goal
producers
consumers
SLAs and data contracts
priority
notification_channels
}
import product example
start > import > quality_on_source > transformation > quality_on_transformation > end
Product pipeline example schema.
export product example
start > sensor_to_import > prepare_transformed_data > quality_on_data_to_export > export > end
* from Airflow 2.4 the sensor_to_import could be based on datasets instead of common time-based sensors.

Assessing DAGs

Considering the principles of the framework definition, our DAGs suffered from these problems:

  • Most DAGs were monolithic (not oriented to a business use case) data transfer processes with a coarse level of granularity, limited (or no) data quality validation, and no data preparation for consumption.
  • Some standards and conventions were missing or loosely defined.
  • Pipelines were hard to maintain.
  • There was too much boilerplate code.
  • There was no defined governance to guarantee standards.
  • Limited documentation and discoverability of products.

To tackle these problems, the framework in place should guarantee that:

The orchestrator executes only product pipelines where there’s a clear definition of value and ownership. These pipelines are trustworthy by having robust quality guarantees, and there is always an agreed contract between data producers and consumers. This process is available as comprehensive metadata.

The proposed framework connects the orchestrator with all the data platform components. The pipelines are governed by defined standards and contain SLAs; processes use different frameworks for ingestion, transformation based on dbt, and data quality. Finally, the framework is connected to the catalog, which acts as the central point of discovery.

Product pipelines general architecture.

Data product pipelines tooling

To allow engineers to build products easily, and validate that a pipeline has all the expected product characteristics, we added the following components on top of Airflow.

A custom DAG decorator

The current Airflow DAG decorator doesn’t completely match the product specification explained above. Therefore, we enriched the existing decorator with a component called DataProductConfig, which contains a set of required fields like:

  • A standardized product name.
  • A data contract definition.
  • A priority level compatible with the OpsGenie incident system.
  • A notification type: Slack or OpsGenie.
  • An ownership definition.
  • A runbook for incident resolution.

An example DAG would be:

from datetime import datetime
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from lib.data_products.data_product_pipeline import DataProductConfig, data_product_dag

product_config = DataProductConfig(
product_name="example_product_import",
sla_code="SLA-EXAMPLE-00",
priority_level="P5",
owner_team="data-products",
notification_type="opsgenie",
runbook_link="https://<link-to-the-runbook>",
)

@data_product_dag(
product_config=product_config,
start_date=datetime(2022, 10, 15),
schedule_interval="@daily",
tags=["example", "ingestion"],
)
def example_product_oriented_pipeline():
"""Example product oriented pipeline to import and validate an example data asset.
This is an example of product oriented pipeline. This comment is stored automatically to dag.doc_md.
Key aspects:
- In the decorator the DAG ID is set equal to function name and the docs equal to this comment.
- Utils is abstracted as part of this creation. [Using Opsgenie default].
- Predefined parameters also set by default. They can always be overwritten.
- The loading logic link is [LINK].
- The transforming logic link is [LINK].
- The quality checks link is [LINK].
"""
@task
def start() -> str:
return "started"

@task(retries=2)
def load(response: str) -> str:
return f"loaded {response}"

@task
def quality_on_source(response: str):
return f"confirmed {response}"

@task
def transform(response: str) -> str:
return f"transformed {response}"

@task
def quality_on_transform(response: str):
return f"confirmed {response}"

@task
def done(response: str) -> str:
return f"end {response}"

end = DummyOperator(task_id="End")

done(quality_on_transform(transform(quality_on_source(load(start()))))) >> end

dag = example_product_oriented_pipeline()

This definition brings the following benefits:

  • Reduce most of the boilerplate by creating several abstractions including some to run dockerized processes, alerting, data quality checks, dbt runners, and so on.
  • Increase the pipeline’s reliability by guaranteeing data contracts between data producers and consumers.
  • Establish a clear process to manage alerting, incident resolution, and pipeline ownership.

Data contracts definition

Establishing contracts for data products makes it easier for everyone to define what data they need, and when they need it; at the same time, data producers can enforce mechanisms to measure and guarantee that expectation.

SLA definitions measure and assess the agreed contract between a producer and a consumer. To understand what’s included in a contract, it’s important to understand these definitions based on Google’s SRE:

SLAs: agreement between data producers and data consumers. This establishes a promise between the parties.

SLOs: objectives that make the promise feasible.

SLIs: indicators to measure the promise.

Error budget: allowance for violations within a period of time.

Each data pipeline that a producer in the organization creates includes a business promise to data consumers. The promise should state at least the following points:

  • Business goal of the pipeline.
  • Freshness: “How often is the data updated?”
  • Completeness/Volume: you deliver accurate data counts.
  • Correctness: schema and distribution of records based on business rules.
  • Dependencies with upstream SLAs (represented in a graph).

You can use the following YAML definition to describe the business promise:

---
code: SLA-EXAMPLE-00
description: DESCRIPTION OF SLA
goal: WHAT DO YOU WANT TO ACHIEVE WITH THIS SLA
promise_start_date: 2022-09-07
links: OPTIONAL EXTRA INFORMATION, FOR EXAMPLE A LINK
status: proposed
domains:
- DOMAIN RESPONSIBLE FOR THE SLA
dag_id: example_product_import
dag_owner: NAME OF THE TEAM RESPONSIBLE FOR THIS PIPELINE, SHOULD MATCH OPSGENIE IN CASE OF INCIDENTS ENABLE
dag_run_frequency: daily
slos:
product_pipeline_freshness:
daily_max_delivery_hour_utc: 7 # Latest delivery time for data freshness
check_task: End
evaluation:
period: PERIOD OF THIS EVALUATION
max_delay_budget_in_hours: 8 # Max. allowed data delivery delay for this pipeline during the specified period
quality_checks:
check_task:
- End
evaluation:
period: PERIOD OF THIS EVALUATION
number_quality_errors_allowed: 1 # Max number of quality failures
allow_missing_percentage_threshold: 1 # Tolerance level for missing records during validation
time_for_resolution:
max_time_to_resolution_when_delayed_in_hours: 8 # Max. allowed data delivery delay for this pipeline during a single day
promised_datasets: # List of promised datasets with this contract
- dataset_1
- dataset_2
- dataset_3
notification:
priority: PRIORITY LEVEL OF THE NOTIFICATION, SHOULD MATCH OPSGENIE
slack_channels:
- channel_1
- channel_2
create_opsgenie_incident: CREATE INCIDENT IN OPSGENIE OR NOT

After defining a contract, a service evaluates the pipeline/data status against the SLA promise, delivers the results, and notifies producers and consumers about the data availability. This increases the level of transparency in the data organization, as well as the level of trust between the parties.

Pipelines governance layer

The abstractions described above are important elements to design a component that validates when a DAG is properly defined as a product. This is where a custom DAG linter process takes place, with the goal of validating the pipelines against a set of predefined rules.

DAG linter components.

This enforces conventions, and it ensures that the proposed abstractions are used before committing and deploying.

Pipelines metadata producer

The next step is to make the new product pipeline discoverable in the data discovery tool (DataHub, which deserves a separate article). Therefore, it is important to define clearly which tags to attach to a DAG, and which tags can be autogenerated with the DataProductConfig. The following graph shows the tag creation and classification mechanisms.

Tags definition for product pipelines.

Using this definition, the data product DAG abstraction validates not only the metadata completeness, but also automatically creates several tags for the pipelines to classify them properly, and to make them easily discoverable in the catalog.

Example DAG auto-created tags.

Putting the architectural components together

Finally, this is how all the components work together:

Data product pipeline architecture.

A data product pipeline should be trustworthy, it should create value for consumers, it should ensure that producers follow the defined standards, and that the product is available in a structured way in the catalog.

Part of the current and future work focuses on making it easier and more consistent to create and use new pipelines, as well as defining a standard way for different teams to get data in/out of the analytics space. The strategy includes educating teams and providing templates for pipeline metadata, pipeline execution, quality checks, dbt macros, and so on.

Takeaways

Moving from common pipelines to data products has brought the following benefits to Miro’s data organization:

  • Efficiency: for consumers, it’s easier and faster to discover the data assets they’re looking for; for producers, it’s easier and more efficient to write pipelines and contracts, so that they can save time and focus on what’s really important.
  • Transparency: producers and consumers now speak the same language, and they share the same metrics regarding pipelines. Along with a clear ownership definition, it increases the level of trust and makes improvement processes possible.
  • Reliability: the focus on quality increases the opportunity to make better-informed business decisions.
  • Teamwork: moving closer to self-serve is making us a more mature data organization.

Join our team!

Would you like to be an Engineer at Miro? Check out our opportunities to join the Engineering team.

--

--