Airflow DAG Performance and Reliability

Manas Pant
Pasarpolis Product + Tech
9 min readJul 17, 2020

Our data warehouse fetches data from a variety of sources. The rate at which data is ingested varies between them. The volume of data ingested per pull, batch, or stream, varies too. We pull 100,000 rows per hour from several MySQL transactional tables. We have to read and write IoT files that have over 5 million events per day. With each load that writes data into BigQuery infrastructure, we ensure that its integrity is maintained. It remains true to the source. We have built checks and balances using out of the box Airflow features combined with our custom code to ensure our DAGs/ETLs are running along the expected lines.

We have fact-type tables and dimension-type tables in our data warehouse that are denormalized to the maximum possible extent. A Fact table represents a single transaction/unit per row. E.g. Purchase of a policy of amount x. Dimension tables, in a traditional approach, are joined with a fact table to describe each transaction. E.g. Who purchased the policy and which product was it? I would not use the terms fact tables and dimension tables because our warehouse is not implemented as a star or snowflake schema. Ours is a modern data warehouse built using query-as-a-service technology like BigQuery that did not require us to build online analytical processing cubes or rigid normalized structures to squeeze performance.

We have two types of DAGs -

  1. Full truncate and load where we remove the table before repopulating it from scratch in every run
  2. Incremental pull using MERGE statement where we perform a join between the source and target data sets to ensure only fresh rows are inserted and the existing ones are updated

They could be creating detailed tables at the target or summary tables. We chose which tables had to be refreshed using which of these two approaches using these guidelines -

  1. Fact-type tables are often incremental pulls because they are big in size
  2. Dimension-type tables are often smaller, and go through a lower volume of changes over time, and therefore are full-truncate-load operations
  3. Summary tables are often implemented as incremental pull operations
  4. Detail tables which are created as a result of joins between multiple large-sized tables are often full-truncate-load operations

Our ETL tests could be classified into the following -

Type of Testing — Applicability

  • Row count check — If there’s no aggregation between source to target, the row count should match
  • Expectation based data points check — We identify 10 random data points aggregated and non-aggregated which we study over time to identify their expected range of values. An observation outside that range is flagged as an anomaly that is review worthy.
  • Performance Testing — Log, define a baseline, and then do improvements over time as the same jobs run on recurring schedules. The rate of improvement of performance has to be greater than the rate of increase of data at the source.
  • NULL checks — Are we getting abnormal counts of NULLs for certain dimensions and metrics? Baseline counts are identified against dimensions ahead of putting a scheduled check.
  • Ensure no duplicates — The grain of data is defined at each layer using a combination of dimensions to ensure that each row is unique.
  • Data type validation — Ensure that for every new table created the datatype of fields matches that of the source. Further, check if the data received from the source on scheduled runs matches the expectation to prevent data loss. E.g. Choosing DATETIME format to store information that is in TIMESTAMP format.

Airflow has all the basics that we needed to get visibility into the day-day running of our DAGs.

  • Airflow internal ad-hoc querying: Airflow UI provides us with a web interface where we can run ad-hoc queries to analyze Tasks' performance. It also provides us with an interface to create various charts to visualize the analysis.
  • Admin Console: Admin Console provides us a platform where we visualize the implemented DAGs and their status.
    - Toggle Button: Button to enable or disable particular DAG
    - DAG: Specify the name of the DAG deployed
    - Schedule: This column describes the scheduled interval of the DAG
    - Owner: Owner of the particular DAG
    - Recent Tasks: Each circle in this column represents all possible states of Task and the number within the circle represents the count of recent tasks in that particular state
    - Last Run: This column holds the timestamp of the latest DAG Run
    - DAG Runs: Circles in this column represent the states, success, running, failed, of DAG and number in the circle, represent the number of DAG Runs in that particular state
    - Links: This column holds various links related to DAG. E.g. Trigger DAG, Tree View Graph View, Code View, Logs, etc.
  • DAGs Graph View: Graph View gives us the complete picture of the execution flow of the tasks and their current state for a particular DAG instance. We click any on the listed tasks to know their details — like what code was executed if it failed, what error did it throw, etc.
  • Choosing the apt run: We can get the details of the execution of tasks and their state for a specific DAG run using the filter provided in the Graph View.

DAG Run names represent if the DAG has been triggered manually or by the scheduler and timestamp at which it was triggered.

In addition to what is available out of the box, we customized certain things for simplicity from an infrastructure review point-of-view.

  • CI/CD integration in Gitlab: We implemented Airflow through GCP Composer, which stores the DAG scripts in a dedicated Google Cloud Storage bucket. Any modifications done in the DAG scripts stored in that bucket will reflect changes in Airflow. We created a script to pull changes from the DAG repository, to Gitlab, and then push it back to Google Cloud Storage. The script is implemented on Gitlab runner and configures the CI/CD rules on Gitlab to execute them whenever a new tag is pushed.
  • ETL Task Logger: We implemented a custom operator. The ETL implementor places it immediately after a read or write operation. This is our in-house implementation which records DAG variables, such as count of imported and exported rows and the time taken in each task node. We configured a Pulse on Metabase which is nothing but a review report for the previous day. The Pulses feature in Metabase gives us the ability to automatically send regular updates over email and Slack. We have performance and rows counts checks built into such reports.
  • Slack Notifications for exceptions: DAG instances can be initiated with a callback method to handle any failures, through the ‘on_failure_callback’ argument. This argument is propagated to all task instances defined within a DAG. Once configured, if any exception occurs while executing the tasks, the callback method will be triggered by the task’s context as an argument. The task context holds all task-related information which includes the log URL too. We used this property to implement a method to push notifications to Slack.
  • Variance based report: We have configured reports that are based on identified data points. E.g. We have our expectations defined that the daily unit sales of each product have to be within 2 standard deviations of the last 7 days’ average. This flags those products which are outside that range indicating that something could have gone wrong in the ETL. These reports present symptoms of underlying problems when they happen.

Our infrastructure runs on GCP. We take advantage of the visibility that we get through its tools into our solution.

CPU and memory monitoring at Composer level

GCP Composer provides us a dashboard to monitor Composer resources and DAG runs. CPU and Memory utilization can easily be monitored at each node level. Please note that these charts don’t include CPU and Memory utilization of the App Engine instance used for the Airflow UI or Cloud SQL instance. We may use Stackdriver logs for that but have not had the need to.

Mistakes that we made and corrected over time

Our Airflow performance was poor in the first couple of iterations of our data warehouse pipeline. We found our DAGs to be slow, and often running into errors or unknown-state. We understood the root causes over the first few weeks:

  1. Underpowered nodes in the cluster — We set up n1-standard-1 machines in our cluster initially. We found that our Kubernetes pods were getting evicted because the memory requirements exceeded the available memory. The default Airflow configuration ran more parallel tasks in a DAG than our pods could handle causing them to fail. As a temporary fix, we reduced the number of parallel tasks which increased the overall completion time of a DAG. As a long term fix, we increased the hardware configuration of the composer instance. We are now running 3 node clusters of n1-standard-2 machines. This leaves some headroom for us to handle hundreds of DAGs.
  2. Performed processing/compute in python — We wrote Python functions in our DAGs that iterated through millions of rows of data. They read content from CSV files on cloud storage, transformed the data, then wrote the output again into CSV files on cloud storage. The code executed on the Airflow cluster’s worker nodes. We changed that. We now perform all our computations directly on BigQuery using Standard SQL. It is significantly more performant plus it saves CPU and memory on the Airflow cluster. Airflow cluster resources are now better utilized by only moving code and code-references between our services.
  3. Intermediate writes and reads of big datasets — We updated our warehouse design, and changed the way we fetched data to reduce the number of steps to get the data reporting/analysis-ready. Moving from ETL to ELT philosophy and using BigQuery external connections helped us reduce the time taken to execute recurring scheduled jobs by 50%.

A combination of more than one of the above happening in parallel made it difficult to debug the problem.

These are the known GCP/Cloud Composer/API limitations that we work around -

  1. There is no authorization feature available on composer 1.10.3
  2. There is no option to upgrade the composer version on the existing setup, we have to create a new composer setup with the latest available version
  3. We had to patch a few Bigquery operator classes because of a bug in that specific version of Apache Airflow that was Composer supported

Guidelines

  1. Offload compute from Airflow cluster to database, cloud functions, or a dedicated service by using available operators or using custom created ones
  2. Maintain a dedicated repository for the Airflow code to perform continuous delivery to the Composer instance upon creation of new tag
  3. Define and follow DAG, function, and operator naming conventions
  4. Fill description of DAGs, so that it shows up on the tooltip in the admin panel
  5. Configure preventive check in the current DAG to ensure the independent/base DAG has run. Only if the latest data is available upstream then the current DAG may run.

--

--