Cover image for the article, showcasing the article title and the logs of Dagster and dlt beneath it.

Unit Tests with Dagster and dlt

Edson Nogueira
5 min readOct 14, 2024

TL;DR: A practical guide to bring Software Engineering best practices for data pipelines, improving data quality and time-to-value.

In a recent series of articles, culminating with a Dagster OSS deployment guide on AWS, it was highlighted how using Dagster can completely change the value perception of a Data Platform for stakeholders.

However, that is not the only benefit of using Dagster. By design, it also makes it easier to implement best practices from Software Engineering into data workflows, in particular unit tests.

The fact that even experienced data engineers state that it is rare to see testing across data projects is very unfortunate, since properly scoped tests enable a faster development lifecycle and, as a consequence, lower time-to-value for data teams. In addition, it also helps improving data quality, which seems to be the Achille's heel of the vast majority of business.

In this article, a concrete example on how to adopt Software Engineering best practices in Data workflows with Dagster and dlt is discussed.

Data Platform Engineering

To understand the motivation behind the tech stack decisions and workflow that will follow, we have to agree on a definition of what good data platform engineering means.

A recent seminar given by Pedram Navid, which is strongly recommended, showcases a view that closely resembles the view of the the author of this article, which is presented next. Essentially, good Data Platform Engineering should have the following traits:

  • Software Engineering at its root: tests, version control, documentation, environments, (you got it) …
  • Enables other engineers and stakeholders: allows seamless collaboration between engineers and showcases value for stakeholders.

Based on the above definition, a no-brainer tech stack is:

  • Dagster: in the author’s view, it is a framework that makes those seemingly intractable problems become easily solvable ones by design.
  • dlt: lightweight data ingestion engine, with a seamless integration with Dagster and out-of-the-box capabilities for most use cases.
  • dbt: if there is a data tool infamous for incorporating SWE best practices by design it is this one, right?

Core Repo Structure

For our discussion, we will focus on the following repo structure:

Screenshot displaying how the relevant files for the discussion of the article are organized in the repo.

Setting Up the Pipeline

Lets consider a specific use case of moving data from a BigQuery table into a Postgres database. This can be addressed via dlt using the sql_database source:

# dlt_sources/sql_database_pipeline.py

# flake8: noqa
from typing import Any

import dlt
import os

import dlt.common
from dlt.sources.sql_database import sql_table, Table
from dlt.common.libs.sql_alchemy import Select


internal_pipeline = dlt.pipeline(
pipeline_name="internal_reporting",
destination="postgres",
dataset_name="internal_tools",
dev_mode=os.getenv("DAGSTER_DEPLOYMENT", "local") == "local",
progress="log",
)


@dlt.source
def employee(
bigquery_credentials: str = dlt.secrets.value,
) -> dlt.sources.DltResource:
"""Returns a dlt resource for fetching employee data from the
`employee` bigquery table using the given
bigquery credentials.

Uses a query adapter callback to modify the SELECT statement to be executed.
"""

def _query_adapter(select: Select[Any], table: Table) -> Select[Any]:
if table.name == "employee":
# this is SqlAlchemy table.
# modify where and limit clauses
return select.where(table.columns.email != "cutoff").limit(10000)

return sql_table(
credentials=bigquery_credentials,
table="employee360",
schema="internal_reporting",
reflection_level="full_with_precision",
query_adapter_callback=_query_adapter,
)


if __name__ == "__main__":
internal_pipeline.run(employee_360())

To set up the BIGQUERY_CREDENTIALS environment variable, we need a Service Account JSON file and a project name. A utility to create a valid string for this variable is the following:

# generate_bigquery_credentials/main.py

import base64
import json
import typer

app = typer.Typer()


@app.command()
def generate_bigquery_credentials(project_name: str, sa_json_path: str) -> None:
"""Takes in the name of a Bigquery project, a string with the absolute path
to a Service Account JSON file, and prints a `bigquery_credentials` string for usage
in the dlt sql_database source.
"""
with open(sa_json_path) as fp:
bq_sa_json = json.load(fp)

encoded_bytes = base64.b64encode(bytes(json.dumps(bq_sa_json), "utf-8"))

encoded_str = str(encoded_bytes, "utf-8")

bq_credentials = f"bigquery://{project_name}?credentials_base64={encoded_str}"

print(f"\nBIGQUERY_CREDENTIALS={bq_credentials}")

Setting Up Tests

At this point, you might wonder: “So what? Will I need to setup a sandbox BigQuery for that and incur costs every time I run pytest?”

Not really. The whole point for any successful testing strategy is to find the appropriate scope for the tests. For instance, taking the above example where we have a pipeline that the main business requirements are:

  • Uses a placeholder value (“cutoff”) to identify which records should not be loaded;
  • Limits the query results.

The fact that in dlt we have sources and destinations connections decoupled from the pipeline logic, we can simply create fixtures that emulates those external systems to validate our logic and keep that validation regardless of ongoing development.

# internal_data_plaftorm_tests/test_assets.py

import pytest
import duckdb
import dlt
from dlt_sources.sql_database_pipeline import employee_360
from pathlib import Path


@pytest.fixture(scope="module")
def mock_atf_bigquery(tmp_path_factory) -> Path:
tmp_dir = tmp_path_factory.mktemp("data")
source_path = tmp_dir / "source.duckdb"
with duckdb.connect(source_path) as con:
con.sql("CREATE SCHEMA internal_reporting;")
con.sql(
"""
CREATE TABLE internal_reporting.employee (id INTEGER, email VARCHAR);
"""
)
data = (
[(i, "test@example.com") for i in range(5)]
+ [(i, "cutoff") for i in range(5, 10)]
+ [(i, "test@example.com") for i in range(10, 10010)]
)
bulk_insert_stmt = (
"INSERT INTO internal_reporting.employee VALUES"
+ ", ".join(map(str, data))
+ ";"
)
con.sql(bulk_insert_stmt)
print(source_path)
return source_path


def test_atf_internal_pipeline_logic(mock_atf_bigquery):
dest_path = mock_atf_bigquery.parent.joinpath("destination.duckdb")
mock_pipeline = dlt.pipeline(
pipeline_name="test_internal_reporting",
destination=dlt.destinations.duckdb(dest_path),
dataset_name="internal_tools",
dev_mode=True,
)
mock_pipeline.run(employee_360(f"duckdb:///{mock_atf_bigquery}"))
with mock_pipeline.sql_client() as con:
assert not con.sql(
"SELECT * FROM employee WHERE EMAIL = 'cutoff'"
).fetchone()
assert (
len(con.sql("SELECT * FROM employee").fetchall())
== 10000
)

Finally…

Screenshot displaying a successful run of pytest for the internal reporting pipeline.

This way we can be confident that our logic is valid, reducing the potential issues to be catch on expensive computations in staging/prod environments.

Screenshot showing successful materialization of a previously tested asset.

Conclusions

There have been many arguments on “Data Engineering is not Software Engineering”, and rightfully so; the author also shares this point of view, but it is undisputable that there are (many) intersections. In particular, best practices such as testing and documentation should be cornerstone to data pipeline development.

We have presented a concrete example of how to implement properly scoped unit tests using Dagster and dlt. The core idea is: fixture your sources and destinations as needed and validate you logic while developing, not after expensive batch runs in staging environments.

In the next article, we will emphasize the documentation aspect of Data Platform Engineering, and how we develop an efficient framework to ensure every data asset is as valuable to stakeholders as they are to developers. Stay tuned!

Acknowledgements

The author deeply thanks the Internal Data Platform team at Artefact Latam, especially Leonardo Brandão and Douglas Fonseca, for the openness to explore new ideas and unknown tools; it has been delightful to work with you guys!

Another Artefactor that deserves special thanks is Ligia Bozzi for the encouragement and support for technical writting.

--

--

Edson Nogueira
Edson Nogueira

Written by Edson Nogueira

I am a Data Engineer @ Artefact and Ph.D. in Physics.

Responses (2)