Orchestrate your data pipeline with Apache Airflow and Cloud Composer

Alex Ravaglia
Data Reply IT | DataTech
8 min readJun 28, 2023

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. It can help you orchestrate your data pipeline and ETL job. With Python language, you can define and manage the workflow of your data processing pipeline. The workflow is defined as a DAG ( Directed Acyclic Graph ), where nodes represent tasks and edges represent dependencies between tasks. It can be used for any batch data pipeline. Due to its scalability, this platform particularly excels at orchestrating tasks with complex dependencies on multiple external systems. A web and visual interface helps manage the state of your workflows.

Let’s look at a data pipeline in Airflow

Figure 1: IMG source Airflow

Visually, the workflow is a DAG composed of tasks. Each task can be seen as a unit of computation. The edges between tasks specify the dependencies. In Figure 1 the task “analyze” will be activated only after the previous task “ingest” has been completed. The task’s dependencies can be conditional and depend on the output of the previous task. Is shown in Figure 1, that the tasks “describe_integrity” and “save” depend both on the output of “check_integrity”.

Airflow Architecture

Figure 2: IMG source Airflow

Airflow has a modular architecture, it is composed of different components that interact with each other and allow the user to build and manage data pipelines. The principal components are:

  • Scheduler
  • Executor
  • Web Server
  • Metadata Database
  • DAG directory

Scheduler

It orchestrates various DAGs and their tasks, dealing with the interdependencies and limiting the number of runs of each DAG. Each DAG gets a number of processes that it can run. The Scheduler decides which DAG should run on which pool and it delegates the actual task run to the Executor. It then monitors which tasks are running and which are up for execution or retries. It also updates all state transitions in the metadata database. It is responsible that each DAG doesn’t overwhelm the entire system.

Executor

The executor is the unit of computation that actually execute and run the tasks. There are various types of executors, the most commonly used are:

  • SequentialExecutor Will only run one task instance at a time. This type of Executor is suited only for debugging or testing a DAG locally.
  • LocalExecutor Runs tasks by spawning processes, the number of tasks is limited by the configuration of the machine being used to run the tasks and the amount of memory and CPU they have available. Suitable only for running small workloads on a single machine.
  • CeleryExecutor Celery is an asynchronous task queue/job queue based on distributed message passing. The Scheduler adds all tasks to the task queue. The celery worker picks up the task from the queue and runs it. After the execution is complete, the worker reports the status of the task in the database.
  • KubernetesExecutor Each task runs in a pod within a Kubernetes cluster, which helps isolate the environment for all the tasks. While Kubernetes takes care of the pod lifecycle, the Scheduler keeps on polling for task status from Kubernetes.

Web Server

The User Interface of Airflow, the user can interact with it in order to have an overview of the overall health of each DAG and visualize the different components ( runs, logs, code, etc.. ). Here it is possible to manage users, roles, and different configurations for the Airflow setup, like environment variables.

Metadata Database

Database that stores the metadata about DAGs, runs, configurations, variables, and connections. It also stores user information, roles, policies, and statistics from each run.

DAG directory

A folder of Python files with the definitions of the DAGs. It is read by the scheduler and executor. If you need to deploy a new pipeline, once you have defined the code, you place the script in this directory, and then it is possible to interact with it through the UI.

DAG definition and pipeline creation

A DAG definition is a Python file, with the instantiation of a DAG object and tasks. Each task is of type “operator”, it depends on the job that has to do. Finally are defined the dependencies between tasks.

Operators

An Operator is conceptually a template for a predefined task, each task in a DAG is defined by instantiating an operator. Operators are Python classes that encapsulate logic to do a unit of work. There are different types of Operators depending on which task they resolve, here is a list of some of the most frequently used:

Many operators were also developed by communities, and are collected in packages ( called providers ) that can be additionally installed. If you can’t find the operator that suits you, or want to customize it, you can create your own operator in Python extending an operator or the BaseOperator class.

Task and Dependencies

The tasks are executed in the specific order defined in the DAG. A task has dependencies on other tasks (those upstream of it), and other tasks depend on it (those downstream of it). Declaring these dependencies between tasks is what makes up the DAG structure (the edges of the directed acyclic graph). There are two main equivalent ways to declare individual task dependencies.

  • Use the >> and << operators
  • Use the methods: set_upstream() and set_downstream()
# with ' >> ' operator:
t0 >> t1 >> t2 >> t3

# with ' set_downstream() '
t0.set_downstream(t1)
t1.set_downstream(t2)
t2.set_downstream(t3)

# The output DAG is equivalent

Here a dependency where two downstream tasks are dependent on the same upstream task.

# Dependencies
t0 >> t1 >> [t2, t3]

Is it possible after t1 to execute both t2 and t3, or with t1 as BranchPythonOperator you can implement conditional branching.

Task communication: XComs

XComs are a mechanism that let tasks talk to each other. Airflow uses the metadata database to store XComs. The key ( and task_id, and dag_id) are the identifier of your XCom, the value can be anything but must be serializable. It is designed for a small amount of data. Using the methods xcom_push and xcom_pull on the Task instances different tasks can send and retrieve different values to/from other tasks.

Schedule and DAG trigger

A DAG can be time scheduled or triggered by some events. With Airflow you decide which is the logic that starts your pipeline and then you choose the right mechanism to tell Airflow when to start to execute your data pipeline.

Scheduled DAG

Each DAG can be scheduled in order to run at a specific time, in the DAG definition you have to pass a cron expression to the schedule parameter. For example, if you want to schedule your DAG at 6:30 AM every day, you would use schedule='30 6 * * *'. CrontabGuru can help you to write the right cron expression. An alternative could be to use a timedelta object, for example, schedule=timedelta(minutes=30) will run the DAG every 30 minutes. You can also run a DAG continuously, with schedule=@continuous, this schedule will create one continuous DAG run, with a new run starting as soon as the previous run has been completed.

Dataset driven scheduling

Airflow detects when a task in a DAG updates a data object. Using that awareness, other DAGs can be scheduled depending on updates to these datasets. To create a dataset-based schedule, you pass the names of the datasets as a list to the schedule parameter.

Event-driven DAGs with Sensors

A Sensor is a special operator that waits that something happen. It checks if a condition is met at a specific interval. For example, it can wait for a file or in general an external event. When it is triggered, the downstream tasks run. The sensor can be in 2 different modes:

  • poke: It takes up a worker slot for its entire runtime.
  • reschedule: It takes up a worker slot only when it is checking, and sleeps for a set duration between checks.

The trade-off between them is latency.

Airflow in Cloud Composer on Google Cloud Platform

Cloud Composer is a managed Apache Airflow service, that helps you create Airflow environments quickly and use Airflow-native tools, such as the web interface and command line tools. Cloud Composer is fully managed and it is compatible with services within Google Cloud Platform, for example, it is possible to integrate it with Bigquery, Dataflow, Dataproc, Datastore, CloudStorage, Pub/Sub, etc. The pricing is based on consumption, you pay for what you use.

ETL pipeline on Google Cloud Platform orchestrated with Airflow

An example of an ETL job on Google Cloud Platform orchestrated by Composer through Airflow.

Raw data is coming from external sources on a landing bucket in Google Cloud Storage. In Airflow you can set a sensor ( event-based operator) that triggers the pipeline as soon as new files arrive in your bucket. Otherwise, you can wait until at least “X” MB or GB of data is coming, another option could be to schedule it hourly or daily. There are a lot of possibilities, it all depends on the specific use case and business requirements. The trigger logic is various.

When the pipeline starts, Cloud Dataproc starts an Hadoop cluster and runs a spark job, as an ETL pipeline this job could have some data cleaning and preparation steps and maybe even aggregate data and compute some KPIs. The processed data are stored in another bucket. Finally, data is loaded ( for example through the Airflow’s operator: GCSToBigQueryOperator ) from Google Cloud Storage into BigQuery. In that way, processed and aggregated information can be combined with the other information already in your system, and will be possible to do some analytics and knowledge extraction.

--

--