A Metadata DAG Generator for Complex Extractions in Airflow

Rubén Rodriguez Cardos
SDG Group
Published in
11 min readOct 16, 2023

Airflow is, de facto, one of the most widely used orchestrators for ETL/ELT type processes, and although it is simple to use, when it comes to defining a process, a dag in Airflow, it can get complicated if you do not have much experience or are not a person with technical knowledge, this coupled with the growing trend of developing tools guided by metadata, which allows non-technical users to interact with complex systems in a simple way, makes the development of a rendering of complex airflow dags based on metadata a very interesting approach.

This article proposes the development of an airflow dag rendering engine based on adapters, which allow us to particularize for different types of extractions that are necessary based on a set of minimum metadata defined by the user. These adapters will be a combination of jinja templates, to generate the final dag code, and a python file with the necessary scripts.

Overview of our Dag Render Engine

Overview the proposal.

The dag render engine that we are proposing in the article will be structured as follows:

  • dag_templates: to store the needed jinja templates
  • dags_rendered: to store the dags rendered as results
  • metadata: to store yml files as input to the dag render engine
  • scripts: to store the needed scripts as auxiliar functions
  • dag_render.py: Our dag render engine
  • requirments.txt: Python libraries needed
Folders and scripts for our Dag Render Engine

In the image above we can see a summary of the directory structure of our project, the crossed out files are not needed. The venv folder will be created when we created a virtual environment for the development.

Another important thing to keep in mind is that the objective of this article is to expose the proposal for a dag rendering engine, how to implement it in a modular way and how to use it, so certain important aspects, for ease, have been omitted or not taken into account, such as credentials or hardcoded connection strings in the code, it is important that if it is to be used in any environment that requires a minimum of quality/security it is important to review and correct these points.

SETUP THE VIRTUAL ENVIRONMENT

To setup the virtual environment we will use the following commands:

To create the virtual environment

python -m venv venv

To activate the virtual environment, a windows environment is used in this case, in others OS the activation of the virtual environment could be different:

.\venv\Scripts\activate

To install the dependencies in the virtual environment:

pip install -r requirments.txt

Included below is the complete content of the requirements file to install all the necessary dependencies, both for the dag render engine and the development/testing of the auxiliary scripts/functions.

Jinja2
pyyaml
argparse
pandas
sqlalchemy
psycopg2-binary

When the pip install command has finished, we have our environments ready to start development.

AUXILIAR INFRASTRUCTURE

In addition to the development environment, we will also need some additional infrastructure, to test the development, for this we will make one of several docker containers:

- Airflow: For ease of use we will deploy aiflow in an airflow container as there is an official image and it is easy to use. For more information, the complete process of how to deploy airflow with docker consult: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

- Postgres: To store the data extracted from the dags, a container with an instance of postgres will be deployed, since there is an official image of postgres and the use is simple:

docker run — name postgres -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres

DAG RENDER ENGINE

Don’t forget your log

Logs is one of the most important aspects that any system should have, but it is also one of the first aspects that is often forgotten, so don’t forget to set up your logs.

import logging

logging.basicConfig(format='%(name)s - %(asctime)s - %(levelname)s - %(message)s', level=logging.DEBUG)

logging.info('=== Gettting metadata ===')

For more information about logging, check https://docs.python.org/3/howto/logging.html#logging-basic-tutorial

Parsing the arguments

The Dag Render Engine execution should be something similar to:

python dag_render.py -f ‘metadata/sample_dag.yml’

Therefore it is necessary to be able to include the necessary parameters, for simplicity, and as a basis, only one parameter will be included which indicates the yml file with the input metadata. The python code will be the following one:

import argparse



# Parse args
parser = argparse.ArgumentParser(description='Render an Airflow dag from a yml file')
parser.add_argument('-f', ' - file', type=str, help='Path to metadata file', required=True)
args = parser.parse_args()
main(args)

Once we have checked the parameter we can go further with the main method, conceptually this method is very simple, just load the metadata from the yml file and the dag will be rendered according to it.

def main(args):
# Load the metadata
logging.info(f'Metadata file is: {args.file}')
metadata = load_metadata(args.file)
# Render the metadata
render_dag(metadata)

Metadata model and how to load it

The first version of our metadata model will be like this:

config:
dag_name: "MEDIUM_METADATA_DAG"
schedule: "2 0 * * *"
output_dag_filename: 'example_dag.py'
tags: "Medium"

Once we have a first version of our metadata model we need to be able to load these metadata, i.e. our yml file into the dag render engine, so we define a function to load the file and return the data as a dictionary, commonly yml files are loaded as dictionaries in Python for ease of use. A complete metadata display is also included as a debug in the log.

def load_metadata(yml_path):
logging.info('=== Gettting metadata ===')
with open(yml_path, "r", encoding='utf-8') as stream:
try:
metadata = (yaml.safe_load(stream))
pretty_metadata = json.dumps(metadata, indent=4)
logging.debug("=== Pretty Metadata ===")
logging.debug(pretty_metadata)
logging.debug("=== === ===")
except yaml.YAMLError as ex:
logging.error(ex)
return metadata

Jinja comes in!

The following image show resume of how jinja works

Important! The code to be generated based on the jinja is python code, therefore the jinja templates must respect the indentation, even if they are in different files, this is a very common error when generating python code based on jinja, that the indentation has not been adjusted properly in the jinja template.

So once exposed, and put an example of how Jinja works, it is time to start working on our jinja template to create dags, for this case we will generate the head of the dag, in the sense that this template will be responsible for creating: the import of the necessary libraries, the configuration of the dag and several fixed tasks that we will use as junction points with the rest of the code generated with other jinja templates. Our jinja template for the header of the dag will be:

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import BranchPythonOperator

# Adapters, Our auxiliar function from adapter will be here (This will have sense later!)
from scripts.extractors import simple_pandas_extractor, simple_postgres_extractor, check_freshness_simple_postgres_extraction

with DAG(
"{{dag_name}}",
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),

},
description="A simple tutorial DAG",
schedule='{{schedule or '@daily'}}',
start_date={{start_date}},
catchup=False,
tags=["{{tags}}"],
) as dag:

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

Creating the adapters

Once we have our dag header rendered it is time to add the different extractions, which we will generate based on the input metadata and existing adapters, append them to the code previously generated in the dag header, the advantage of this proposal is that once defined an adapter we can reuse it as many times as necessary without having to redevelop anything.

The adapters in this proposal are not a single entity, but are made up of a set of elements:

  • The Metadata Model: Each adapter will need a different metadata model, with a set of attributes.
  • The Jinja template: This jinja template is the python code that will be added to the output dag each time that this extraction is included.
  • The auxiliar functions: sometimes your adapter will need some functions to work, this functions will not be included on the dag because they are not really part of the dag, making the dag easier to read and maintain, all these functions will be included in a python script. It is also possible to create an .airflowignore file to make airflow ignore this file or directory, reducing airflow’s workload. In this case in a file called extractors.py in our directory scripts/ (scripts/extractors.py)
  • The Handler: A python handler that will take the metadata from the specific extraction and inject it into the jinja template, generating the resulting python code. The set of handlers will be inside of our dag render engine, in this case the dag_render.py file
Overview of the structure of the adapters

Simple pandas extraction

The idea of this adapter is to create a simple extraction in pandas, based on url, that is to say, a dataset is obtained based on a url, this dataset is loaded in memory with pandas, since pandas has the functionality to load a dataset from a url, and to store this dataset in a postgres table, through a sqlalchemy connection.

Metadata model

type: "simple_pandas_extraction"
url: "<URL>"
output_tablename: "<OUTPUT_TABLENAME>"

Jinja template

Create a file dag_templates/simple_pandas_extraction.jinja2 with the following content:

    with TaskGroup(group_id="{{task_group_id}}") as tg:

simple_pandas_extraction = PythonOperator(
task_id='{{python_task_id}}',
python_callable=simple_pandas_extractor,
op_kwargs= {'url': '{{url_to_extract}}', 'tablename':'{{output_tablename}}'},
)

start >> tg >> end

Auxiliar functions

Create the file scripts/extractors.py , If you have not created it yet, and put the following content:

def simple_pandas_extractor(url, tablename):

logging.info(' === Simple Pandas Extractor ===')
logging.info(f'Retriving data from: {url}')

data = pd.read_csv(url)
data['extraction_date'] = datetime.now()

logging.info('Connecting with db')
conn_string = 'postgresql+psycopg2://postgres:postgres@host.docker.internal/DEV_RAW'
db = create_engine(conn_string)
conn = db.connect()

logging.info(f'Storing extrated data into table: {tablename}')
data.to_sql(tablename, con=conn, if_exists="append", index=False)

Handler

Add the handler to the file dag_render.py, the handler will be the follwing code:

def render_simple_pandas_extraction(extraction, extraction_id, rendered_pipeline_filename):

environment = Environment(loader=FileSystemLoader(".\\dag_templates\\"))
template_file = "simple_pandas_extraction.jinja2"
template = environment.get_template(template_file)

# Create config dic to render
config = {}
config['task_group_id'] = 'simple_pandas_extraction_tg_'+str(extraction_id)
config['python_task_id'] = 'simple_pandas_extraction_python_task_'+ str(extraction_id)
config['url_to_extract'] = extraction['extraction']['url']
config['output_tablename'] = extraction['extraction']['output_tablename']

# Render the jinja template
with open(rendered_pipeline_filename, "a", encoding='utf-8') as template_filled:
template_filled.write(template.render(config))
logging.info(f'Rendered extraction in file: {rendered_pipeline_filename}')

Simple postgres extraction

The idea of this adapter is to create a simple extraction of a table in a Postgres database, the Postgres table is loaded in memory with pandas and sqlalchemy (as a datatset) , and to store this dataset in a postgres table, through a sqlalchemy connection. Bu t, in this case we added a previous step that check when was the last time that this table/extraction was done, in this table was done in , that’s why we introduce a new attribute in the metadata model, freshness_threshold, if this extraction was made less than the hours indicated in the freshness threshold attribute in the metadata, the extraction is skipped, otherwise the extraction is carried out.

Metadata model

type: "simple_postgres_extraction"
db: "OTHER_DATABASE"
schema: "public"
table: "organizations"
freshness_threshold: "12"
output_tablename: "companies"

Jinja template

Create a file dag_templates/simple_postgres_extraction.jinja2 with the following content:

simple_postgres_extraction.jinja2
with TaskGroup(group_id="{{task_group_id}}") as tg:

check_freshness = BranchPythonOperator(
task_id='branching',
python_callable=check_freshness_simple_postgres_extraction,
op_kwargs = {
'tablename': '{{output_tablename}}',
'freshness_threshold':'{{freshness_threshold}}',
'task_group_id':'{{task_group_id}}'
}
)

simple_postgres_extraction = PythonOperator(
task_id='{{task_group_id}}_{{python_task_id}}',
python_callable=simple_postgres_extractor,
op_kwargs = {
'db': '{{db}}',
'schema':'{{output_tablename}}',
'table':'{{table}}',
'output_tablename':'{{output_tablename}}'
}
)

do_not_extract = EmptyOperator(task_id="{{task_group_id}}_do_not_extract")

check_freshness >> [simple_postgres_extraction, do_not_extract]

start >> tg >> end

Auxiliar functions

Go the file scripts/extractors.py , and add the following content:

def simple_postgres_extractor(db, schema, table, output_tablename):

logging.info(' === Simple Postgres Extractor ===')
logging.info(f'Retriving data from table: {db}.{schema}.{table}')

logging.info('Connecting with db')

conn_string = 'postgresql+psycopg2://postgres:postgres@host.docker.internal/'+db
db = create_engine(conn_string)
conn = db.connect()

data = pd.read_sql_table(table, con=conn)
data['extraction_date'] = datetime.now()

conn_string = 'postgresql+psycopg2://postgres:postgres@host.docker.internal/DEV_RAW'
db = create_engine(conn_string)
conn = db.connect()

logging.info(f'Storing extrated data into table: {output_tablename}')
data.to_sql(output_tablename, con=conn, if_exists="append", index=False)


def check_freshness_simple_postgres_extraction(tablename, freshness_threshold, task_group_id):

# Return the next airflow task id
res = task_group_id+"."+task_group_id+'_simple_postgres_extraction'

logging.info(' === Simple Postgres Extractor Check Freshness ===')

conn_string = 'postgresql+psycopg2://postgres:postgres@host.docker.internal/DEV_RAW'
db = create_engine(conn_string)
conn = db.connect()

logging.info(f'Retriving data from table: {tablename}')
try:
data = pd.read_sql_table(tablename, con=conn)

extraction_date = data['extraction_date'].values[0]
extraction_date = pd.to_datetime(str(extraction_date)).replace(tzinfo=None)
diff_date = datetime.now() - extraction_date

if diff_date > timedelta(hours=int(freshness_threshold)):
logging.info('No fresh data, need to extract again')
else:
logging.info('Fresh data, no need to extract again')
res = task_group_id+"."+task_group_id+'_do_not_extract'

except Exception as ex:
logging.debug(ex)
logging.info(f'Table: {tablename} not found')
res = task_group_id+"."+task_group_id+'_do_not_extract'

finally:
return [res]

Handler

Add the handler to the file dag_render.py, the handler will be the follwing code:

def render_simple_postgres_extraction(extraction, extraction_id, rendered_pipeline_filename):

environment = Environment(loader=FileSystemLoader(".\\dag_templates\\"))
template_file = "simple_postgres_extraction.jinja2"
template = environment.get_template(template_file)

# Create config dic to render
config = {}
config['task_group_id'] = 'simple_postgres_extraction_tg_'+str(extraction_id)
config['python_task_id'] = 'simple_postgres_extraction_python_task_'+str(extraction_id)

config['db'] = extraction['extraction']['db']
config['output_tablename'] = extraction['extraction']['output_tablename']
config['table'] = extraction['extraction']['table']
config['output_tablename'] = extraction['extraction']['output_tablename']
config['freshness_threshold'] = extraction['extraction']['freshness_threshold']

# Render the jinja template
with open(rendered_pipeline_filename, "a", encoding='utf-8') as template_filled:
template_filled.write(template.render(config))
logging.info(f'Rendered extraction in file: {rendered_pipeline_filename}')

GENERATING OUR DAG

Once we have our dag rendering engine ready, with the necessary adapters, it is time to create the input yml to generate our dag, for this we will generate a simple yml with two extractions, although as previously exposed if it is necessary to include more extractions it would be as simple as adding them to the block of extractions of the input yml file. Our test input file, called sample_dag.yml, is as follows:

config:
dag_name: "MEDIUM_METADATA_DAG"
schedule: "2 0 * * *"
output_dag_filename: 'example_dag.py'
tags: "Medium"

extractions:

- extraction:
type: "simple_pandas_extraction"
url: "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv"
output_tablename: "iris_dataset"

- extraction:
type: "simple_postgres_extraction"
db: "OTHER_DATABASE"
schema: "public"
table: "organizations"
freshness_threshold: "12"
output_tablename: "companies"

When we have the metadata input yml file we can run the dags rendering engine with the following command (this command may change slightly depending on the OS, in this case we are using a Windows OS):

python .\dag_render.py -f ‘metadata/sample_dag.yml’

And that’s it! With this we get a dag_rendered/example_dag.py file which is the result of rendering the input yml file and rendering it as an airflow dag.

To test the resulting dag it is necessary to:

· Copy the resulting file from the dag rendering engine, in this case the file dag_rendered/example_dag.py.

· The file with the auxiliary functions of the adapters, in this case the file scripts/extractors.py, it is necessary to create a scripts/ folder inside the directory dags/ that has been generated when our instance of airflow was created with docker.

Resulting Dag imported into airflow and executed.

CONCLUSIONS & FUTURE WORK

Once the dag has been generated and we have completed our Dag Render Engine we can draw some conclusions:

  • Once an extraction has been defined, it can be reused as many times as necessary.
  • By working with metadata we reduce the complexity, in addition to being able to be used by non-technical people.
  • By using metadata and adapters we are not bound to a technology, so we can add new extractions and tools simply by generating the corresponding adapter.

As future work to improve our Dag Render Engine, the following is proposed:

  • Include a Jinja template for the T step of ETL/ELT, where tools like dbt could be used.
  • Include a yml validator to check the input metadata.
  • Use Airflow Variables to not use hardcoded connection strings
  • Improve the dag render engine code and handle each adapter with its own class or interface.
  • Include your own extractions!

You can find the complete code of the project here:

I hope you found this article interesting, thank you for reading it.

--

--