Apache Airflow in a Digital bank Production

Anil Palwai
Analytics Vidhya
Published in
6 min readApr 30, 2020

Production Background: We have 100s of data pipelines(mostly Apache spark) in production to ingest the raw data incrementally into an update based data warehouse, Also these pipelines should run periodically (hourly or daily) and should meet SLA’s to provide data to downstream teams. The input raw data will be placed by upstream teams in the landing location of the data lake anytime after the scheduled time. Where the Apache Airflow, an open-source workflow management platform became handy to schedule, monitor, re-run the failed tasks in the production.

Apache Airflow was built on python and that can easily extend it to our needs. to be summerize, to execute our pipelines, we need to create DAG by writing templated python code and place it in the “dag bag” ($AIRFLOW_HOME/dags Directory on the master node), after that airflow scheduler will automatically pick up this dag, parse it and send all tasks into a task queue to execute on airflow worker nodes independently. The Airflow web server will get all the DAG metadata information from the Metadata database to display them in WebUI.

Below are the building blocks of Airflow DAG:

  1. import python packages

2. Create Dag object by passing dag name, Default arguments, StartDate, Schedule interval, corn-like schedule expression, email to alert and so on

3. Create tasks(i.e Operator instances), by passing parameters to the specific operator.

4. Set dependencies between tasks using bitshift operators(“<<” or “>>”) or by python methods set_upstream()or set_downstream().

Sample Airflow Architecture in the Production environment(Design is subjected to the Organization’s culture and standards).

Deployment Strategy: After creating the Airflow DAG, the developer would maintain the dag along with the source code of data pipelines in the version control repository(in this case Bitbucket).CI/CD pipelines will be using Jenkins to pull and build the source code from repo and deploy executable jars and their dependencies in all airflow worker nodes and DAG files into all the Airflow master node at “dagbag” locations(usually $AIRFLOW_HOME/dags/).

Utilization of Sensors: As mentioned in the pipeline background, the Upstream team will copy the to be processed raw data into the landing location (S3 in this case) at any time after the scheduled time. Here, We have created sensor tasks with S3Sensor and S3PrefixSensors operators to continuously poke into S3 location, Whenever files with matching naming format available in the input location, the sensor will return the boolean value as “true”, then actual Spark tasks in the DAG will kick-off.

Worker node setup: As we know all tasks of the same DAG may execute in different worker nodes independently, so tasks will execute randomly on available nodes. To get this done properly, we should maintain all worker nodes with the same configuration, metadata, and the directory structure.

for example, task “t2”, is an instance of the BashOperator by calling a Bash script with the bash_command argument as below.

the task “t2” may execute on any one of the worker nodes randomly. So the script file “test.sh” should available on every worker node at the same location(“/home/batcher/test.sh) and with reading permissions.

XComs: As discussed, the Airflow target is to run the tasks independently on the worker nodes, so there is no possibility of sharing the data/information between the tasks. However,in case of tasks that should get some metadata or information from other tasks, to fulfill such requirement airflow will have Xcoms to share the information from one task to another task. Where source task is to push the information to XComs at any time by calling the xcom_push() method. Tasks will call xcom_pull() to retrieve XComs.

Airflow worker and Spark limitations: To execute any specific task on any worker node, need present to have all the dependencies of that operator. As most of these pipelines are spark based, so have configured airflow worker nodes on spark gateway nodes to launch spark jobs on spark cluster by executing airflow tasks. But limitation here is to be the spark job should be in client mode, which gives airflow an ability to get spark logs and job status. If we cluster mode is mandatory to launch spark job then airflow will not able to get spark logs and job status.

Airflow CLI: Airflow has a very rich command-line interface that allows for many types of operation on a DAG, starting services, and supporting development and testing.

Also, we can test each task in DAG independently by commands

airflow test dag_id task_id execution_date
airflow run dag_id task_id execution_date

Backfill: Airflow will auto re-run the previously failed task instances within the backfill date range.

airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE] [-m] dag_id

Remote logging and metrics visualization: Users can specify a logs folder in airflow.cfg using the base_log_folder setting. By default, it is in the AIRFLOW_HOME directory.

Besides, users can supply a remote location for storing logs and log backups in cloud storage. To enable this feature airflow.cfg must be configured with “remote_logging = True”, then Airflow can store logs remotely in AWS S3, Google Cloud Storage, or Elastic Search.

Where in Elastic Search, if we can store the logs with id as “{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}” and json_fields = asctime, filename, lineno, levelname,message, then we can able to visualize the complete Airflow metrics on either Kiband or Grafana by querying on Elastic Search.

Challenges: The biggest challenge in the Airflow platform is to maintain scheduler and web server daemons. In case, webserver daemon got killed or crashed then WebUI will not be available to users, but it will not affect any task execution scheduling. We could manage this by maintaining multiple masters with multiple webserver daemons. To workaround, Also can restart the airflow webserver.

But in case, scheduler got killed or crashed, it will stop scheduling the tasks and also create chaos in the cluster. To be noted, we can’t maintain multiple schedulers in the same cluster, if we do so task duplication will happen. Chance of scheduler is more when current running dags are high in numbers, can happen as it is unable to query the airflow metadata database(MariaDB in our case). To mitigate that issue we have used a database proxy (MaxScale in our case) to handle queries from the scheduler. This proxy will load balance the requests such that send write queries to the Master node and read queries to worker nodes of the airflow metadata database.

Any queries, please reach me on Linkedin: www.linkedin.com/in/anilpalwai

--

--

Anil Palwai
Analytics Vidhya

A digital data engineer and specialized in providing solutions in Hadoop, Apache Spark, Apache Kafka, Airflow for on-prem, cloud, and Hybrid environments.