Apache Airflow — Crash overview

M Haseeb Asif
Big Data Processing
10 min readMar 26, 2020

Data pipelines are data processing functions or different tasks put together in a pipelined fashion. For example, you get a notification based on your activity on the e-commerce website, someone posts on the Facebook group and it generates notifications for certain people, checking transactions for fraudulent activity.

Airflow is widely used to develop data pipelines or workflow automation or any series of steps to perform operations (processing the data). It helps to automate tasks of the data processing pipeline. It doesn’t have any of its data storage, other than metadata and neither does any processing but executes programs irrespective of the language. You will use python to write pipeline but each stage of pipeline can execute programs from different languages or frameworks. Possibilities are endless.

“Airflow is a platform to programmatically author, schedule and monitor workflows. Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed. When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.”

Airflow has all the workflows as DAGs (Directed Acyclic Graph). It’s a directed graph with nodes and edges with no cycles in it. A DAG consists of different operators where each operator’s instance is an individual task or execution unit. All of the tasks collectively make a DAG or pipeline.

Airflow internals

Airflow has five major components as shown in the diagram. The scheduler is at the heart of the program and puts different tasks in the queue for workers. The database is used to store configuration and leveraged by a web interface for a user to perform control operations.

Components of Airflow
  • Scheduler orchestrates the execution of jobs on a trigger or schedule. The Scheduler chooses how to prioritize the running and execution of tasks within the system. You can learn more about the Scheduler from the official Apache Airflow documentation.
  • Work Queue is used by the scheduler in most Airflow installations to deliver tasks that need to be run to the Workers.
  • Worker processes execute the operations defined in each DAG. In most Airflow installations, workers pull from the work queue when it is ready to process a task. When the worker completes the execution of the task, it will attempt to process more work from the work queue until there is no further work remaining. When new work in the queue arrives, the worker will begin to process it.
  • Database saves credentials, connections, history, and configuration. The database often referred to as the metadata database, also stores the state of all tasks in the system. Airflow components interact with the database with the Python ORM, SQLAlchemy.
  • Web Interface provides a control dashboard for users and maintainers. The web interface allows users to perform tasks such as stopping and starting DAGs, retrying failed tasks, configuring credentials, The web interface is built using the Flask web-development microframework.

DAG Execution Order

  • The Airflow Scheduler starts DAGs based on time or external triggers.
  • Once a DAG is started, the Scheduler looks at the steps within the DAG and determines which steps can run by looking at their dependencies.
  • The Scheduler places runnable steps in the queue.
  • Workers pick up those tasks and run them.
  • Once the worker has finished running the step, the final status of the task is recorded and additional tasks are placed by the scheduler until all tasks are complete.
  • Once all tasks have been completed, the DAG is complete.
How airflow works

In the following example, we have a DAG with three tasks. Once the scheduler realized that it’s time to execute the dag, it will push the task to the queue in the dependency order and workers will pick up the different tasks from the queue. Once executed, the next task will be picked up by the worker until all the tasks are complete and then DAG is marked as complete.

Creating a simple DAG

Creating a DAG is easy. It requires a name and start date only. Following is a simple DAG with a name, a description, a start date, and an interval. If your start date is in the past, Airflow will run your DAG as many times as there are schedule intervals between that start date and the current date. Furthermore, unless you specify an optional end date, Airflow will continue to run your DAGs until you disable or delete the DAG.

from airflow import DAG
myDag = DAG(
'myFirstDag',
description='Let's Play with airflow in Corona Lockdown',
start_date=datetime(2020, 3, 26),
schedule_interval='@daily')

DAG will have tasks usually defined by the operator’s instances. They are the atomic step that makes up a DAG. Some of them are PythonOperator, PostgresOperator, RedshiftToS3Operator, S3ToRedshiftOperator, BashOperator, SimpleHttpOperator, Sensor. Following is an example of a python operator to create a hello world task.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def hello_world():
print(“Hello World”)
myDag = DAG(...)
task = PythonOperator(
task_id=’hello_world’,
python_callable=hello_world,
dag=myDag)

Now we will discuss each of the basic components required for the DAG in more detail.

Schedules

Pipelines are often driven by schedules which determine what data should be analyzed and when. It helps to scope the data to specific ranges. Some of the reasons why schedules are helpful are

  • Pipeline schedules can reduce the amount of data that needs to be processed in a given run. It helps scope the job to only run the data for the time period since the data pipeline last ran. In a naive analysis, with no scope, we would analyze all of the data at all times.
  • Using schedules to select only data relevant to the time period of the given pipeline execution can help improve the quality and accuracy of the analyses performed by our pipeline.
  • Running pipelines on a schedule will decrease the time it takes the pipeline to run.
  • An analysis of a larger scope can leverage already-completed work. For. e.g., if the aggregates for all months prior to now have already been done by a scheduled job, then we only need to perform the aggregation for the current month and add it to the existing totals.

Determining the appropriate time period for a schedule is based on a number of factors that you need to consider as the pipeline designer.

  1. What is the size of data, on average, for a time period? If an entire year worth of data is only a few kb or MB, then perhaps its fine to load the entire dataset. If an hour’s worth of data is hundreds of MB or even in the GBS then likely you will need to schedule your pipeline more frequently.
  2. How frequently is data arriving, and how often does the analysis need to be performed? If our data is coming every hour, that will be a driving factor in determining the schedule. Alternatively, if we have to load hundreds of thousands of tiny records, even if they don’t add up to much in terms of MB or GB, the file access alone will slow down our analysis and we’ll likely want to run it more often.
  3. What’s the frequency of related datasets? A good rule of thumb is that the frequency of a pipeline’s schedule should be determined by the dataset in our pipeline which requires the most frequent analysis. This isn’t universally the case, but it’s a good starting assumption. For example, if our data is updated every hour, we’ll probably want to run our analysis every hour.

Schedules are optional and may be defined with cron strings or Airflow Presets. Default schedule is daily. Airflow provides the following presets:

  • @once - Run a DAG once and then never again
  • @hourly - Run the DAG every hour
  • @daily - Run the DAG every day
  • @weekly - Run the DAG every week
  • @monthly - Run the DAG every month
  • @yearly- Run the DAG every year
  • None - Only run the DAG when the user initiates it

Airflow will begin running pipelines on the start date selected. Whenever the start date of a DAG is in the past, and the time difference between the start date and now includes more than one schedule intervals, Airflow will automatically schedule and execute a DAG run to satisfy each one of those intervals. This feature is useful in almost all enterprise settings, where companies have established years of data that may need to be retroactively analyzed.

Airflow pipelines can also have end dates. You can use an end_date with your pipeline to let Airflow know when to stop running the pipeline. End_dates can also be useful when you want to perform an overhaul or redesign of an existing pipeline. Update the old pipeline with an end_date and then have the new pipeline start on the end date of the old pipeline.

Following example DAG will run 12 times for each month of 2018. max_active_runs can be used to set the no of parallel instances if we want to control the parallel no. of tasks.

dag = DAG(
‘coronaDAG’,
start_date=datetime.datetime(2018, 1, 1, 0, 0, 0, 0),
end_date=datetime.datetime(2018, 12, 1, 0, 0, 0, 0),
schedule_interval=’@monthly’,
max_active_runs=1
)

Task Dependencies

In Airflow DAGs:

  • Nodes = Tasks
  • Edges = Ordering, and dependencies between tasks

Task dependencies can be described programmatically in Airflow using >> and <<

  • a >> b means a comes before b
  • a << b means a comes after b
hello_world_task = PythonOperator(task_id=’hello_world’, ...)
goodbye_world_task = PythonOperator(task_id=’goodbye_world’, ...)
...
# Use >> to denote that goodbye_world_task depends on hello_world_task
hello_world_task >> goodbye_world_task

This allows connecting list of tasks as well e.g. task1 >>[task2, task3]. Internally these operators map to upstream and downstream methods so tasks dependencies can also be set with “set_downstream” and “set_upstream”

  • a.set_downstream(b) means a comes before b
  • a.set_upstream(b) means a comes after b
hello_world_task = PythonOperator(task_id=’hello_world’, ...)
goodbye_world_task = PythonOperator(task_id=’goodbye_world’, ...)
...
hello_world_task.set_downstream(goodbye_world_task)

Airflow Hooks

Airflow needs to know how to connect to your environment or other systems to move data around or do pipeline operations. Information such as hostname, port, login and passwords to other systems and services is handled via connections. Airflow allows users to manage the connection and configuration of different connected systems through UI. Airflow lets you change these settings or configuration without changing your code and you can update the configuration from the UI.

Connections can be accessed in code via hooks. Hooks provide a reusable interface to external systems and databases. With hooks, you don’t have to worry about how and where to store these connection strings and secrets in your code.

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
def load():
# Create a PostgresHook option using the `demo` connection
db_hook = PostgresHook(‘demo’)
df = db_hook.get_pandas_df('SELECT * FROM rides')
print(f'Successfully used PostgresHook to return {len(df)} records')
load_task = PythonOperator(task_id=’load’, python_callable=hello_world, ...)

Airflow comes with many Hooks that can integrate with common systems. Some of the common ones are HttpHook, PostgresHook, MySqlHook, SlackHook, PrestoHook.

For example, if we want to read from the AWS s3bucket, first we will have to create a connection and maybe variable using the following steps.
1. Open your browser to localhost:8080 and open Admin->Variables and Click “Create”
2. Set “Key” equal to “s3_bucket” and set “Val” equal to “myBucket”
3. Set “Key” equal to “s3_prefix” and set “Val” equal to “myPrefix”
4. Open Admin->Connections and click Create
5. Set “Conn Id” to “myCredentials”, “Conn Type” to “Amazon Web Services”
6. Set “Login” to your aws_access_key_id and “Password” to your aws_secret_key and hit save

Once we set up our connection and variables, we will use s3hook with our connection to list the keys for our bucket.

hook = S3Hook(aws_conn_id='aws_credentials')
bucket = Variable.get('s3_bucket')
prefix = Variable.get('s3_prefix')
logging.info(f"Listing Keys from {bucket}/{prefix}")
keys = hook.list_keys(bucket, prefix=prefix)
for key in keys:
logging.info(f"- s3://{bucket}/{key}")

Airflow leverage the templating to allow users to provide runtime variables for different tasks based on their execution context. For example, the execution date is important to segment or partition data before processing. Airflow provides a huge variety of different variables as stated in docs. We need to set the context to true and access the variables in our task or callable functions as follows.

def hello_date(*args, **kwargs):
print(f“Hello {kwargs[‘execution_date’]}”)
task = PythonOperator(python_callable=hello_date,
provide_context=True, ...)

Data Lineage

The data lineage of a dataset describes the discrete steps involved in the creation, movement, and calculation of that dataset. In general, data lineage has important implications for a business. Each department or business unit’s success is tied to data and to the flow of data between departments. For e.g., sales departments rely on data to make sales forecasts, while at the same time the finance department would need to track sales and revenue. Each of these departments and roles depend on data and knowing where to find the data. Data flow and data lineage tools enable data engineers and architects to track the flow of this large web of data. Airflow has a very intuitive UI to visualize the data lineage of any data pipeline. We can use Graph view and tree view to view the pipeline lineage and it’s execution statuses.

Data Partitioning

Pipelines designed to work with partitioned data to fail more gracefully. Smaller datasets, smaller time periods, and related concepts are easier to debug than big datasets, large time periods, and unrelated concepts. Partitioning makes debugging and rerunning failed tasks much simpler. It also enables easier redos of work, reducing cost and time.

Another great thing about Airflow is that if your data is partitioned appropriately, your tasks will naturally have fewer dependencies on each other. Because of this, Airflow will be able to parallelize the execution of your DAGs to produce your results even faster.

They are a couple of ways we can partition our data. First, we can leverage the Airflow schedules. Not only schedules are great for reducing the amount of data our pipelines have to process, but they also help us guarantee that we can meet timing guarantees that our data consumers may need. Secondly, Logical partitioning or partitioning conceptually related to discrete segments and processing separately. With logical partitioning, unrelated things belong in separate steps. Consider your dependencies and separate processing around those boundaries. Also worth mentioning, the data location is another form of logical partitioning. For example, if our data is stored in a key-value store like Amazon’s S3 in a format such as: s3://<bucket>/<year>/<month>/<day> we could say that our data is logically partitioned by time. Lastly, size partitioning separates data for processing based on desired or required storage limits. This essentially sets the amount of data included in a data pipeline run. Size partitioning is critical to understand when working with large datasets, especially with Airflow.

References:
1. Notes from Apache Airflow course on Udacity
2. https://airflow.apache.org/docs/stable/
3. https://blog.godatadriven.com/zen-of-python-and-apache-airflow
4. https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data