We built a system to forecast demand in Walmart Stores which comprises of data engineering and ML pipelines. Ensuring batch jobs in our pipeline run according to schedule is critical to business. Delays in job runs or missing a run has impact in the order of millions of dollars. This blog post explains what we did beyond the capabilities of our scheduler to detect job run misses and delays.
We use Apache Airflow(Open Source) for composing & scheduling data pipelines. This blog post requires a basic idea of Apache Airflow and data pipelines in general. The focus is not on Airflow’s features but on how we built an audit system on top of it.
We have taken care of the following aspects of Airflow to make sure the scheduler is available 24/7 and jobs are triggered -
- Airflow Infra Monitoring(Scheduler Daemon, Celery Worker, RabbitMQ, Metastore)
- Airflow HA(will be detailed in a separate blog post)
- Airflow SLA feature
We had situations where AF did not trigger our ETL Jobs(spark jobs in our case) but its Web UI wrongly shows the corresponding task in “running” status. There is no way to know about this unless the on-call engineer verifies it manually — Airflow’s SLA feature did not help here as AirFlow itself is behaving erroneously. This could be due to a bad configuration in Airflow Infra (less likely) or unstable moving parts of Airflow or a bug in AirFlow itself. Irrespective of the root cause, we must know when a job is delayed so that the on-call engineer takes a manual corrective action immediately. We implemented an auditor which runs independently from AF and alerts when any of our Spark jobs in a pipeline doesn’t start on time.
Approach for Auditing
Estimate ETL task start times using run history from Airflow backend. Every Spark batch job(our ETL execution unit aka Airflow Task) registers itself to a database table with a unique id. This denotes to the auditor system that the job needs to be audited. The job also sends its heartbeat whenever it runs. Alert if task heartbeat is not received on time as per the estimate. We assume Airflow history is sane i.e., majority of the previous runs happened per usual schedule. One may wonder why estimate from history when you can just use the scheduling information of DAG directly, but it is not enough since schedule gives expected start time of entire DAG as a single unit but not that of individual tasks inside.
Hit the ground running
We use two tables
- task_instance — maintained by Airflow
- audit_registration — updated by every spark job with every run.
audit_registration: task_id, is_registered, last_updatedtask_instance: task_id, start_date, dag_id and so on.
Auditor is implemented as a standalone java program, it has below components
- Auditor daemon thread: This stays alive all the time and primarily triggers another thread once every X(configurable) hours to run 2 & 3 below.
- Interval Estimator: This looks at task_instance, and for each concat(dag_id, task_id), it calculates the interval estimate which is a mathematical mode of intervals between consecutive start_times. If mode cannot be arrived with a repetition of at least two intervals, it uses median.
- Current Run Checker: For every task_id in audit_registration, if the last_update is not between current time and current time minus estimated interval, it will send an alert.
Auditor runs in a fail-fast mode, i.e., the java process of auditor will fail as soon as it sees something wrong e.g., it cannot complete thread execution(steps 2 & 3) within X hours, it is unable to reach Airflow backend, alerting service is not reachable. This triggers an alert to Ops by our process monitoring.
Finer details & further Improvements
- Intervals are rounded to nearest half hour.
- Alerts about the delay of the same job are throttled to 1 per day.
- This can be further refined to consider patterns such as Seasonality(e.g., Daily data pulls taking longer on Mondays due to busy source systems) and outliers( e.g., sporadic slowness of pulls). Jobs that don’t run on a particular day of week result in false alarms, algorithm can be improved to learn the same from history.
- Spark Jobs can also register heartbeat against fully qualified dag_id+task_id instead of just task_id with co-operation from Airflow to get dag_id at run time. This is because a given spark job can be re-used in multiple DAGs.
- New jobs won’t have enough history to be able to estimate start times. We just send a warning notifying inadequate history.
- Rescheduled DAGs will have false positives/negatives in the next N runs due to stale history. Allow sufficient time for correct history to build-up.
This saved us multiple times from missing SLAs, and is critical to our ETL pipelines today. This in turn allowed us to safely depend on relatively new but feature-rich Apache Airflow as it continues to mature. This can also be used with any other Scheduler by adapting to its backend data model.