Data Workflow Dependency Orchestration
By — Karuna Saini ( Engineer, Data Platform)
When it comes to data world, there’s a whole lot of steps that data flows through in a pipeline. For maintaining the consistency, accuracy and completeness of data, it becomes absolutely necessary for each step to execute exactly after the set of steps it is dependent upon. This is where comes the concept of DAG’s (or Directed Acyclic Graphs).
DAG for dummies
Every morning you wake up and dress up for the day. There is some set of clothing that you always wear before others. For example, a shirt before a tie, socks before shoes and so on!
shirt -> tie -> :)
tie -> shirt -> :/
Unless you are a fashion blogger who loves to experiment with your clothes in unconventional ways, you would not want to wear your tie before or below your shirt. That’s the “directed” part of a DAG. Not coming back to “wear-a-shirt” once you have already worn it covers the “acyclic” part of DAG. Combining the above two scenarios pretty much explains what a DAG is. It’s that simple. Something that we follow everyday in our lives subconsciously. :)
To summarise, a DAG is a directed graph without any cycles, i.e., while moving from one node to another you would not encounter the same node again.
Coming to workflow management, a node in the graph becomes a task or a script and the directed edges represent the order in which those tasks have to be executed for the ultimate output to make complete sense.
Workflow Setup @ UC
We are using Apache Airflow in distributed environment for workflow management. Web server and scheduler run on airflow server node and there are multiple worker nodes listening on different queues. Each task is pushed to a specific queue by the server which eventually gets picked up for execution by the worker listening on that queue.
- Server Node: Job scheduler and web-server run on this node. The airflow config file present here can be tweaked to optimise for the use cases. All the DAG scripts are also deployed on this node and synced with worker nodes with the help of NFS (Network File System).
- Database: We are using MySQL as database for Airflow. It stores all the metadata related to DAGs and task runs.
- Queue: Redis is being used as the queue between server and worker nodes. Each service has its own assigned queue. All the tasks related to a service are scheduled in its respective queue.
- Worker Nodes: Just like queues, each service also has its own worker node. All the scripts within that service reside on this node. Worker node is responsible for picking up tasks from queue and executing them. We are using Celery Workers here.
Once a DAG has been deployed on Airflow, all the details about it become available on the web server. You can enable/disable a DAG or even trigger a manual run from the UI. All the DAG runs can be accessed in “details” tab along with run logs of each task.
Above image displays the graph view of our front-end events processing DAG. It is evident that two of the raw event jobs “customer_events” and “customer_sessions” run in parallel. Once both of them complete, we have a “start_cluster” job to spawn up an EMR cluster for our next job to run upon. Next one is a view job called “customer_event_session_master”. This takes the output of raw events processing jobs as input, performs some processing on top of that data and makes it relevant for business use case consumption. At the end, we have a “stop_cluster” job to terminate the cluster after the view job completes.
All of these tasks are nothing but standalone python scripts which get executed on the worker node.
Standardisation should be focussed upon a lot while developing anything which is supposed to support widespread use cases and audience. Keeping this in mind, the Core Platform team at UC developed a workflow framework on top of Airflow. The framework supported scheduling and running various NodeJs scripts spread across micro-services and teams which was a use case then.
We extended the above framework to support Python scripts along with dependency resolution since DAG is one major use case to solve for when it comes to Data Platform.
The standardisation in the way workflow scripts are written led to some specific DAG and task templates which all of our DAGs code would have in common. These templates are kept abstracted out and are used to programatically generate code for any new DAG.
All of the variables enclosed by angle brackets are replaced by required values in the DAG deployment flow to create a new DAG or update an existing one.
Python Task Template
Each DAG could have one or more of such task templates depending upon the number of tasks in the DAG. These task templates are appended to the DAG template to come up with complete script of a DAG.
DAGs in Action
Whole list of DAGs can be viewed on Airflow web server homepage along with their schedule interval and basic run stats. It also displays the various execution stages that tasks in a DAG run are at any instance of time.
We extended the existing automation set up by the Core Platform team on top of Jenkins for the deployment of our DAGs. We added support to have multiple scripts present across different Git repos within a DAG. This Jenkins job takes all the variables as input, replaces them with their values in templates and creates and deploys the final DAG on top of Airflow.
We have integrated airflow metrics with Prometheus and Grafana for efficient monitoring. The objective is to have a single dashboard give us sufficient information about the health of our Airflow server and DAG runs.
This is further extended to have alerts on top of metrics displayed on the dashboard. We put up alerts on failures and for tasks getting stuck in queues. This helps in taking proactive action against any unexpected event.
How does it help us?
Earlier, all our workflow scripts were scheduled on crontab. We had to check the run durations and schedule scripts accordingly for them to get executed in the expected order. This was troublesome in the following ways:
- At times some of the scripts would take longer than expected time to finish. This resulted in data being incomplete at various parts of the pipeline.
- We would have to keep enough buffer between the scheduling of two dependent jobs to avoid the scenario in point 1, which means lesser flexibility in scheduling.
- Even if the upstream job got failed, the downstream job would get triggered leading to wastage of resources as the end result of downstream job would anyway not be the expected one in this case.
- For any changes in run schedule or for triggering jobs manually, we would have to login to the server where cron is setup.
Workflow setup on top of Airflow solved all of the above problems for us and gave us much more flexibility and control over scheduling and maintenance of our job runs.
About the author –
Karuna likes to play with colours on canvas and data while working. She is a part of Data Platform team which ensures that everyone gets their data in the most standardised and scalable way.
Sounds like fun?
If you enjoyed this blog post, please clap 👏(as many times as you like) and follow us (@UC Blogger) . Help us build a community by sharing on your favourite social networks (Twitter, LinkedIn, Facebook, etc).
If you are interested in finding out about opportunities, visit us at http://careers.urbancompany.com