Apache Airflow Cheatsheet
Apache Airflow is a system to programmatically author, schedule, and monitor data pipelines. It has been six years since version 1.0.0 of Apache Airflow was released. Months of coding, fixing, deploying onto the on-premise servers as well as in the cloud in the hundreds of companies over the world. Many difficulties were passed and here is an attempt to put some of the conclusions in one place to help others save their time and make using Airflow easier.
If you will find that some statements are not accurate, please do not hesitate to comment, it will be fixed as soon as possible.
General
- It’s easy to start your local Airflow environment by using official Docker-image from here https://hub.docker.com/r/apache/airflow
- Airflow API documentation is good to know what levers you can use to integrate your external system with Airflow. Please read it here https://airflow.apache.org/docs/apache-airflow/stable/security/api.html
- It’s good to read
airflow.cfg
(with all the comments inside) to learn more about how you can configure your Airflow installation. And please, spend some time to go through the project repo to understand how it’s built here https://github.com/apache/airflow. - Airflow (as well as my lovely Apache Superset) is based on top of Flask and Flask App Builder (FAB) frameworks. It means you have a lot of customisable features, i.e. your own security manager. You can read about it here https://medium.com/geekculture/custom-security-manager-for-apache-superset-c91f413a8be7.
Scan files and DAG configuration
- If you want the scheduler to skip files in your DAG folder, just don’t use words
airflow
anddag
in files. Airflow scheduler scans only files with those words in the text by default. If you want to change the default behavior just useDAG_DICOVERY_SAFE_MODE=False
in yourairflow.cfg
- Another way is to ignore unnecessary files via
.airflowignore
file in the root of the DAGs folder. It works the same as the.gitignore
- Be careful, there will be no errors if you have different DAGs with the same
dag_id
. Web UI will display one random DAG at one time. - It might be not intuitive but
start_date
is an optional parameter for DAG. It is obligatory for each task, but for some reason it could be different in different tasks of the DAG which is counterintuitive. - Default schedule_interval for DAG is
timedelta(days=1)
- The
dagrun_timeout
parameter could help you if you have problems with neverending DAGs.
DAG runs
- The
catchup
parameter is set toTrue
by default. It means each time you unpause the DAG all the missed dagruns will start immediately. The best practice is to set this toFalse
for each and every DAG. Even more, you might want to setCATCHUP_BY_DEFAULT=False
inairflow.cfg
to change the default behavior for all DAGs. Anyway, even withcatchup=False
one dagrun still will be run after unpausing. - The first dagrun for DAG will be triggered at
start_date + schedule_interval
which is theexecution_date
in fact. - You can create timezone-aware DAGs using
pendulum
module. In versions≥1.10.7
, if the timezone was specified for DAG, the cron expression will respect daylight savings time (DST), but timedelta object won’t do so. By the way, in Airflow versions prior to1.10.7
everything was the other way around — timedelta objects respected DST, but cron expressions were not. - Do not treat Airflow like multifunctional something. It is a very powerful process orchestrator, or as some people say it is a cron on steroids. Use it for triggering jobs outside your Airflow system. For example, if you want to load data from PgSQL or MySQL to Clickhouse DB — if applicable, use it’s own mechanism for loading data from external tables instead of writing your own custom operator or using PostgresOperator with
ClickHouseOperator
. Some people think that theDockerOperator
andKubernetesPodOperator
should be the only two types of operators you use in production environment. - If you use an AWS environment and you need to start job when a new file arrives to S3 bucket — it’d be better to use AWS Lambda to trigger your dagrun instead of using sensors. Here is an useful article on how to use Lambdas https://medium.com/swlh/aws-lambda-run-your-code-for-free-1c7fa6714ee9.
Variables and connections
- It is possible to hide values from some variables. You could find which fields will be masked in
DEFAULT_SENSITIVE_VARIABLE_FIELDS
inairflow.cfg
. - You can get a
dict()
object from JSON variable value by usingdeserialize_json=True
as a parameter ofVariable.get()
. It can help decrease the number of connections to the Airflow database. - You will benefit from putting
Variable.get()
statements inside the function definitions instead of using them in the main DAG code. Otherwise, the scheduler will create database connections each time it scans file systems for new DAGs. - It is possible to decrease database connections without moving
Variable.get()
statements into the function definitions by using environment variables. Env variables whose start withAIRFLOW_VAR_*
are processed before UI variables and stop further searching of variables if there are any. Please, be advised that env variables will not be visible in the UI. - Another way is to use Jinja templating. Templated fields of the task will be processed at task run only, but not at the time of periodic DAG folder scanning by the scheduler. To read a variable in the templated field use
op_args=["{{ var.json.varname.jsonkey }}"]
- You can create Airflow connections using environment variables as well as by using
AIRFLOW_CONN_*
pattern in the name. - Since the version
1.10.10
it is possible to use custom secret backends like AWS Secrets Manager or GCP Secret Manager.
Task execution parameters
- Instead of using XCOM to transfer data between tasks, you are encouraged to transfer metadata through XCOM.
- The
depends_on_past
parameter of task prevents a task from being executed before the same task in the previous dagrun was either marked as successful or failed. - The
wait_for_downstream
parameter allows task to be executed only when the same task and its direct downstream tasks were completed in the previous dagrun. - The task
priority_rule='downstream'
(which is the default) calculates absolute task priority weight by summing up priorities of all downstream tasks. For example, when you have 2 dagruns running and a task pool of size 1 execution of dagruns will go in parallel (task from one dagrun and then from another). - The task
priority_rule='upstream'
calculates absolute task priority weight by summing up priorities of all upstream tasks. For example, when you have 2 dagruns running and a task pool of size 1 tasks from the second dagrun will be executed only when the last task in the first dagrun was completed. execution_timeout
parameter can be useful with long-running tasks.trigger_rule
is here for you, if you want to change the behavior of the task whichALL_SUCCESS
by default. Full list of values available could be found here https://github.com/apache/airflow/blob/main/airflow/utils/trigger_rule.py
Parallel execution
parallelism
(32 by default) — how many tasks can be executed at the same time in the whole Airflow instance (airflow.cfg
parameter).dag_concurrency
(16 by default) — how many tasks can be executed at the same time in the DAG (DAG parameter).max_active_runs_per_dag
(16 by default) — how many DAGs can be executed at the same time (airflow.cfg
parameter).max_active_runs
— how many DAGs can be executed at the same time (DAG parameter).max_queued_runs_per_dag
maximum number of queued dagruns for a single DAG, the scheduler will not create more DAG runs (airflow.cfg
parameter).
Sensors
poke_interval
— how often the sensor will check the condition.- Sensor with
mode='poke'
occupies a slot in a pool until condition met. - Sensor with
mode='reschedule'
occupies a slot in a pool only for a time of checking condition and releases slot until the next poke. timeout
parameter (7 days by default) withsoft_fail=True
can set the sensor toskipped
state instead offailed
.exponental_backoff=True
replaces fixedpoke_interval
with a more dynamic one.
DAG dependencies
ExternalTaskSensor
monitor specific task_id and its state in another DAG.TriggerDagRunOperator
triggers another DAG and can wait while it is getting executed.