Apache Airflow At Palo Alto Networks

Navaneeth
8 min readJun 12, 2020

--

As a part of the Cortex Data Lake platform team at Palo Alto Networks, we are building a Batch processing pipeline to support running different application jobs written in Spark, PySpark, Hadoop, MapReduce etc. at scale running at regular intervals.

For this purpose, we have identified Apache Airflow, as our choice of Workflow Engine, to support the scheduling of these application jobs at scale.

Self Managed Airflow Solution

When we started on our journey to explore the Apache Airflow on Google Cloud, we experimented with a few methods for running Airflow on GKE. Along the process, we identified the need for a self-managed solution. Let’s start with a little background on these components and how we got to our current solution.

Apache Airflow

Apache Airflow is a Platform created by the community to programmatically author, schedule, and monitor workflows. Since Airflow pipelines are configured as code, they are dynamic in nature. Airflow provides the ability to define our own operators, extend the libraries to suit our environments, and scale for a larger number of jobs with the support of the message queue to orchestrate any number of workers.

Why Self Managed Airflow Solution?

In the process of experimenting different ways of running Airflow on Google Cloud, we have identified certain flexibilities/requirements that led us to start exploring the necessity of a Self Managed Airflow Solution:

  • Easy and quick deployment process to spin up an Airflow pipeline when needed
  • Ability to control the version of the Airflow image, providing the flexibility to work with custom operators and features when available.
  • Since most of our internal services are running on GKE, deploying Airflow as a service provides us the ability to interact with other services seamlessly.
  • Tune Airflow configuration parameters to support high scale of DAG’s and manage them per deployment based on the load
  • Flexibility to add custom libraries on top of base Airflow image helps with adding better management and monitoring capabilities (eg: prometheus for monitoring, GCSFUSE with GCS integration etc.)
  • Scaling worker pods based on the load of operation helps in a cost-effective solution
  • Better management of read and write states and ability to restore back to normal state during any crash scenarios

Airflow Deployment and Scaling

Due to the benefits and flexibilities listed above, we currently build our own custom Airflow Docker images based on top of open source available base images and deploy these Airflow services as Kubernetes pods on GKE using Spinnaker pipeline.

Scaling Self Managed Airflow Solution

Although there are several benefits and flexibilities to managing our own Airflow deployment solution, scaling it for a large number of DAG’s and tasks was a more critical and difficult problem to solve for our continued success.

Here are the main components that were considered and highlighted as an outcome of our scaling progress:

  • Airflow Scheduler — Airflow scheduler takes the role of scheduling and monitoring tasks on an array of workers while following the specified dependencies
  • Airflow Webserver — Webserver helps provide a web interface and custom-built REST API plugin for visualizing and interacting with the backend
  • Airflow Worker — Workers consume tasks to be run from the queue and execute them
  • GCSFUSE — This is a Google library that is used for mounting GCS buckets onto the Airflow Kubernetes pods for DAG’s, logs, and plugins that are required for execution.
  • Dataproc — Google Dataproc cluster where the SPARK, PYSPARK applications are executed.
  • Memorystore — Google memory store acting as a queue for storing DAG’s to be processed.
  • Postgresql — Postgresql db for storing all DAGRUNs

Single Worker Pod Architecture

  • A given Airflow deployment solution at any given time will have only one scheduler
  • Airflow scheduler, worker and web server are deployed as Kubernetes Pods on GKE
  • Redis MemoryStore is configured to use for queuing and Postgres as a backend metastore
  • GCSFUSE here is a library, used as the connection to mount a given DAG(Directed Acyclic Graph) bucket to all the airflow pods.

Scaling Configuration for Single Worker Pod Setup

  • Initially, we started with deploying the airflow solution with a single worker node i.e 1 scheduler, 1 web server, and 1 worker pods
  • The bigger achievement during this phase was abstracting the Airflow configuration and mounting it as a config map to the pods that gave us more control while scaling to multi worker pods
  • We started with Airflow version 1.10.2 and evolved to 1.10.4 to support metics capabilities
  • This setup was used to functionally stabilize the system and integrate with running an end-end solution for our internal projects to run Pyspark jobs for data processing
  • The major hurdle during this phase was getting GCSFUSE configured to read and write from pods to GCS buckets, which added extra overhead to processing times

Configuration:

We have tested the single worker node solution with the following configuration

  • Postgresql db - 1vCPU
  • Memorystore - 10gb
  • parallelism = 32(the max number of task instances that can run concurrently on airflow)
  • dag_concurrency =16(the number of task instances allowed to run concurrently within a specific dag)
  • worker_concurrency = 16(how many tasks a single worker can process)
  • max_threads = 2(scheduler threads to schedule tasks)
  • db connections configuration = 5 default + 10 overflow connections
  • Tasks Per dag = 9
  • min_file_process_interval = 0 (after how much time (seconds) a new DAGs should be picked up from the filesystem. We wanted instant processing here.)

The Memory and CPU graphs for the pods below are for a 10 DAG Run setup.

Database Connections :

Prometheus based Metrics for monitoring the queue size and scheduler Delays:

Multi Worker Pod Architecture

  • A multi worker airflow setup has 1 scheduler but multiple workers based on the DAG’s scale.
  • Running Airflow version 1.10.4

Scaling Configuration for Multi Worker Pod setup

We operated on the below configuration by this time in our scaling process:

  • Number of DAG’s actively running: 65 with an average of 12 tasks per DAG
  • Postgresql db - 4vCPU
  • Memorystore - 30gb
  • Scheduler ( issues encountered and fixed while scaling scheduler will be discussed in further blogs) - CPU = 5vCPU and Memory = 4G
  • Workers - Number of workers = 3, CPU = 4vCPU and Memory = 7G (per pod)
  • Web server - CPU = 4vCPU and Memory = 4G (a smaller configuration will do based on web server workers. We are using 4 workers.)
  • parallelism = 64 (the max number of task instances that can run concurrently on airflow)
  • dag_concurrency = 64 (the number of task instances allowed to run concurrently within a specific dag)
  • worker_concurrency = 30 (how many tasks a single worker can process)
  • max_threads = 3 (scheduler threads to schedule tasks)
  • max_active_runs_per_dag = 16(The maximum number of active DAG runs per DAG)
  • min_file_process_interval = 60(after how much time (seconds) a new DAGs should be picked up from the filesystem)
  • dag_dir_list_interval = 30(How often (in seconds) to scan the DAGs directory for new files)
  • Dataproc worker nodes = 16 ( This was the maximum identified autoscaled instances, we are yet to performance test this cluster and will highlight in further blogs)
  • GCSFUSE - Caching interval is configured to 1hr and caching capacity is default to 4096 objects.

Usage Graphs and Results for 65 DAG run setup, each running hourly :

Major Issues and findings

Worker Scaling - Identifying how to scale worker :

  • As per our community-based research and practical experience: if you are running a higher number of smaller/lightweight tasks regularly, then spanning them across multiple lightweight workers is a better idea
  • If you are running relatively heavier tasks that are smaller in number and in lower frequencies, then using a heavyweight worker may be more efficient for execution purposes
  • We have been using the first option of multiple lightweight workers for our task needs.

Worker Scaling - Scaling workers in number :

  • So far, we have identified that for each new worker added, the Postgresql needs to be increased by 1vCPU unit(with fixed concurrency). The impact of scheduler pooling still needs to be accounted for the overall db configuration.

GCSFUSE :

  • We have identified a major impact of GCSFUSE caching, impacting the scheduler and worker performance. Hence, we have tuned the caching interval to 1hr which has dramatically helped the DAGRUNS performance. This caching wouldn’t be an issue in our setup as we don’t expect any regular updates to DAG’s

LOGS :

  • All the pods generate run time logs and store them on the local pods. We had to disable GCSFUSE sync of writing these files back to GCS as this had abnormal issues, potentially destroying the pods. GCSFUSE is not suitable for writing back data to a GCS bucket from multiple instances.

REST API plugin :

  • Another major issue was performance degradation on workers when our custom rest API plugin was mounted on to them. This was due to the frequent loading of the DAG bag. We have decoupled the plugin to be mounted only on to the Web Server and not the worker (or) scheduler by creating a custom image for the webserver with the mounted plugin file to the appropriate path. We have also disabled writing back to GCS to avoid the sync of these files to GCS buckets.

Conclusion

Airflow has now become an important part of data infrastructure at Palo Alto Networks and we are in the process of aggressively scaling it to support 1000’s of DAG’s. We hope this post helps anyone who is looking to explore Airflow and its deployment using Kubernetes. A big shoutout to our Batch Compute team (Wenfu Feng, Nimit Nagpal, Chandan Kumar, Tejdeep Kautharam, Punit Mutha, Benoy Antony) in achieving great success with building data processing solutions that empower our customers.

--

--