Dynamic Task Mapping in Apache Airflow

Arjun Gadani
Simform Engineering
6 min readOct 19, 2023

Automating flexible workflow orchestration with Dynamic Task Mapping in Apache Airflow.

Before diving into Dynamic Task Mapping, let’s briefly understand the concept of tasks in Apache Airflow. A task represents a single unit of work within a DAG (Directed Acyclic Graph), and it can perform various operations, such as running Python scripts, executing SQL queries, or interacting with external systems.

What is Dynamic Task Mapping?

Dynamic Task Mapping, a powerful feature introduced in Apache Airflow, automates the creation of multiple tasks at runtime, leveraging dynamic input. This capability enhances workflow orchestration by dynamically generating tasks, thereby minimizing the requirement for manual task definition, resulting in increased flexibility and scalability.

Why is it Important?

In traditional workflows, DAG authors need to anticipate and define all possible tasks in advance, which can be limiting when dealing with variable data or complex scenarios. Dynamic Task Mapping solves this problem by generating tasks dynamically based on the current data state. It also helps increase code redundancy making workflows more adaptable and efficient.

How Dynamic Task Mapping Works

Dynamic Task Mapping in Apache Airflow is about automating the creation of tasks on the fly, enhancing flexibility, and simplifying complex workflows. In this section, we’ll explain how it works.

Task Parameters and Expansion

Dynamic Task Mapping leverages the expand() function, allowing a single task to be expanded into multiple instances, each with different parameters. This expansion is based on the output of a previous task, enabling the creation of multiple task instances at runtime.

Example: Mapping Over a List

To illustrate Dynamic Task Mapping, envision a scenario where you have a task, Task A, responsible for processing a list of marks that includes both numerical values and ‘N/A’ entries, ultimately generating an output. In this scenario, the objective is to automate the creation of multiple Task B instances, each tasked with performing additional operations on individual numerical values extracted from the output of Task A.

Code Example

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

# Define a simple DAG
# with DAG(dag_id='example_dynamic_task_mapping', start_date=datetime(2022, 3, 4)) as dag:
with DAG(dag_id="example_dynamic_task_mapping_v1", start_date=datetime(2023, 5, 1), schedule=None, catchup=False) as dag:

@task
def apply_filter(mark):
return mark if isinstance(mark, int) else None

@task
def calculate_percentage(marks):
print(marks)
print(sum(marks) / (len(marks)))

filtered_values = apply_filter.expand(mark=[68, 96, 67, None, 'N/A', 88, 'N/A'])
calculate_percentage(filtered_values)

Execution and Results

When executed, this DAG will create tasks ‘apply_filter’ for each input value from the given list mark = [68, 96, 67, None, ‘N/A’, 88, ‘N/A’]. Each task checks if the input is an integer; if it is, the task returns the integer value; otherwise, it returns None.

Afterward, the ‘calculate_percentage’ task computes and displays the percentage grade for the provided marks. percentage grade for given marks.

DAG Graph

Mapped Tasks

Calculated Output

Fetching File data from an S3 Bucket and Dumping to DB

Let’s explore a more practical scenario: fetching files from an Amazon S3 bucket, performing some necessary transformation, and dumping that into a database. We want to leverage Dynamic Task Mapping to handle a variable number of files that arrive daily.

Code Example:

from airflow import DAG
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
import pandas as pd
from sqlalchemy import create_engine
from airflow.decorators import task
from datetime import datetime, timedelta


with DAG(dag_id="dynamic_task_s3_to_postgres_dag", start_date=datetime(2023, 5, 1), schedule=None, catchup=False) as dag:

@task
def load_files_s3():
import os

s3_hook = S3Hook(aws_conn_id='temp_s3_conn') # Use your AWS connection ID

# Specify your S3 bucket and directory containing CSV files
s3_bucket_name = 'blog-temp-resources'
s3_directory = 'data/'

# Specify the local directory where CSV files will be stored temporarily
local_tmp_directory = '/tmp/'

# List CSV files in the S3 directory
s3_files = s3_hook.list_keys(bucket_name=s3_bucket_name, prefix=s3_directory)

# Iterate over the S3 files, download, transform, and load them
for s3_file in s3_files:
local_file_path = os.path.join(local_tmp_directory, os.path.basename(s3_file))

# Download the CSV file from S3 to the local temporary directory
s3_hook.download_file(bucket_name=s3_bucket_name, key=s3_file, local_path=local_file_path)


# Define the function to transform the CSV data
@task
def transform_csv(file_path):
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Load the CSV file into a DataFrame
posgres_hook = PostgresHook(postgres_conn_id='postgres_default')
csv_file_path = file_path
df = pd.read_csv(csv_file_path)

# Split the "Period" column into "Year" and "Month"
df['Year'] = df['Period'].str[:4].astype(int)
df['Month'] = df['Period'].str[5:].astype(int)

# Drop the original "Period" column if needed
df = df.drop(columns=['Period'])

engine = posgres_hook.get_sqlalchemy_engine()

df.to_sql('temp_biz_data', con=engine, if_exists='append', index=False)
print('Data Dumped !!!')
engine.dispose()
import os
os.remove(file_path)
print('File deleted : ', file_path)


@task
def get_file_paths():
import os

# Specify the directory path where your CSV files are located
folder_path = '/tmp_data/'

files = os.listdir(folder_path)
# Filter for files with a .csv extension and create full file paths
csv_files = [os.path.join(folder_path, file) for file in files if file.endswith('.csv')]
print('csv_files : ' ,csv_files)
return csv_files

# Define the S3 to Local file transfer task
s3_to_local = load_files_s3()
csv_list = get_file_paths()
transform_and_load_files = transform_csv.expand(csv_list)
# Set task dependencies
s3_to_local >> csv_list >> transform_and_load_files

DAG Graph

Mapped Tasks

Execution and Results

In this DAG, the process begins with the triggering of the initial task in the pipeline, labeled ‘load_files_s3’. This task is responsible for downloading all files from a designated S3 location to a temporary storage location, which is created if it doesn’t already exist. Afterward, the ‘get_file_paths’ task comes into play, gathering the filenames of the downloaded files and preparing them for future dynamic task generation. Subsequently, a task is dynamically generated for each available file or input, executing the required transformations and depositing the resulting data into the database.

Note: I have used S3Hook and PostgresHook for external connections.

Best Practices for Dynamic Task Mapping

In this section, we’ll explore essential best practices for effectively using Dynamic Task Mapping in your workflows. These practices ensure smooth task generation, independent task execution, and robust error handling.

Task Independence

Ensure that tasks generated through Dynamic Task Mapping are independent and can run in parallel. Avoid dependencies that might cause bottlenecks.

Monitoring and Error Handling

Implement robust monitoring and error handling mechanisms, as the dynamic nature of tasks can lead to varying execution patterns.

Scalability

Consider the scalability of your workflow when using Dynamic Task Mapping. Ensure that your infrastructure can handle the potential increase in task instances.

Conclusion

In this blog, we explored Dynamic Task Mapping in Apache Airflow, a feature that brings flexibility and adaptability to workflow orchestration. By dynamically generating tasks based on data output, you can create more efficient and scalable workflows. Experiment with Dynamic Task Mapping in your own workflows to harness its full potential and streamline your data pipelines.

Dynamic Task Mapping is a game-changer for Airflow users, and it opens up exciting possibilities for workflow automation.

For more updates on the latest tools and technologies, follow the Simform Engineering blog.

Follow Us: Twitter | LinkedIn

--

--