Airflow — Custom Plugins

Saumalya Sarkar
The Startup
Published in
5 min readMay 30, 2020
Courtesy: unsplash

Hello Again!!!

In the previous blog, I discussed about creating DAGs dynamically. In this article let’s explore custom plugins.

Don’t get me wrong, Airflow already has a rich collection of plugins to accomplish specific tasks, eg.BashOperator (to execute bash command), PythonOperator (to run python methods), S3Sensor (sense for file availability on AWS S3), S3Hook (To implement several tasks related to AWS S3) and many, many more. But then again, in real life situations we all face situations where built-in features just doesn’t fill the plate, then comes the customization.

Just to brush-up, let’s go through three most widely used plugins types supported by Airflow — Hooks, Sensors and Operators.

Hooks: Hooks are basically python modules that enables tasks to access external platform, like AWS, AZURE, GCP and many more.

Sensors: Sensors are python modules which are used to create watcher tasks(in the most basic sense), for example s3Sensor is used to create s3 file watcher task. A sensor stays in running state till a specific state appears.

Operators: Operators are typically execution engines. Operators are used to create task that execute some process, based on the type of Operator. For example: PythonOperator can be used create a task that will run a specific python method.

Following the similar methodology, we will create our own Hook, Sensor and Operator. To make any custom plugin (sensor, operator) available to Airflow scheduler, we must declare/inject our custom plugins in a specific format, we will come to that part once we are done writing the plugins. Let’s first create our custom hook.

File: https://bitbucket.org/saumalya75/airflowoncontainer/src/master/airflow_home/plugins/hooks/custom_s3_minio_hook.py

Info: Hooks are just straight-forward python classes, that inherit from Airflow provided BaseHook or any existing Hook class. In our example we are inheriting S3Hook class.

Requirement: Create a hook that will connect to the S3 or Minio(boto3 compatible native object store) based on user input and replicate all functionalities of provided S3Hook.

Brief Code Discussion: We have inherited S3Hook (built in Airflow) in our custom Hook classes. Based on the provided conn_type we are creating CustomMinioHook or CustomS3Hook object. Both these classes override get_client_type and get_resource_type method of S3Hook class to connect AWS S3 or Minio. This branching is handled in object creator(__new__ ) of CustomS3MinioHook class. That will be all. Nothing else, as promised simple python module implementing platform APIs.

Next, Custom Sensor. Please note, unlike Hooks, we have to follow some specific format to convert a simple Python class to a Airflow sensor. The rules are:

  1. A sensor class has to inherit Airflow provided BaseSensor class from airflow.sensors,
  2. @apply_defaults decorator need to be applied on the constructor __init__ ,
  3. One method named poke has to be defined which takes one positional argument — context. When any task containing any sensor runs, the poke method of the sensor class is called by Airflow. The sensor will pass when True (or equivalent to True) is returned as output of this method, else the method will be triggered repeatedly.

Keeping these points in mind, let’s have a look at out example custom sensor:

File: https://bitbucket.org/saumalya75/airflowoncontainer/src/master/airflow_home/plugins/sensors/custom_s3_sensor.py

Info: We are looking at one sensor S3SensorFromProvidedValue , in the actual file there is another sensor which is used to implement XCOM variables, which is discussed in the next article.

Requirement: Create a sensor which will wait for arrival of source file on AWS S3 or Minio based on provided input.

Brief Code Discussion: As you can see, we are using the previously created Hook to connect to the cloud server, and several methods from the hook to check for the file existence. First we created an object of CustomS3MinioHook class, then via that object we are communicating to the cloud server using methods like check_for_key, get_wildcard_key, check_for_wildcard_key etc. Also notice that we are returning True when get_*_key methods are returning true, else returning False. Hence the sensor will pass only the expected file present on target server.
Note: The code in actual file will be a bit different due to incorporation of XCOM variable functionality.

On to the operator then, similar to sensors we also have some rules to create a Airflow operator class:

  1. An operator class has to inherit Airflow provided BaseOperator class from airflow.operators,
  2. @apply_defaults decorator need to be applied on the constructor __init__ ,
  3. One method named execute has to be defined which takes one positional argument — context. When any operator task runs, the execute method of the operator class is called by Airflow.

Let’s jump into the code then:

File: https://bitbucket.org/saumalya75/airflowoncontainer/src/master/airflow_home/plugins/operators/custom_file_load_operator.py

Requirement: Create an operator which will read the source file and print it. The operator will also import and integrate one criminally simple (as we are not discussing python module development, maybe some other time) python module just to show how user can integrate and execute their own modules via an Airflow operator.

Brief Code Discussion: As you can see, we are using the same Hook to connect to the cloud server, and get_key from the hook to check for the file existence.
Note: The code in actual file will be a bit different due to incorporation of XCOM variable functionality.

Once all custom elements are developed, it’s time to inject those into airflow plugins. Once we do that, we will be able to import our custom sensors from airflow.sensors, just like built-in sensors. The concept is similar for operators and any other airflow supported plugins.

To implement the same, we will have to create one CustomPlugin class inheriting AirflowPlugin class. In that class we have to declare some specific global variables in list format as follows:

File: https://bitbucket.org/saumalya75/airflowoncontainer/src/master/airflow_home/plugins/custom_plugin.py

For all possible plugin types, check here.
Note: Please note the plugin module (file containing the plugin class) must be kept in ${AIRFLOW_HOME}/plugins path, or else Airflow will not consider it.
Also, I have discussed about dynamic DAG generation here and XCOM variable usage here.

Credit where it’s due:

Although I published the article, it was a joint venture with rohith karnati. He is a python enthusiast, DevOps ninja and Airflow developer. Kudos to his efforts.

All related code is available in this BitBucket repository. Clone it, Fork it, Use it.

P.S.: Any suggestions will be highly appreciated. I am available at saumalya75@gmail.com and linkedin.com/in/saumalya-sarkar-b3712817b .

--

--

Saumalya Sarkar
The Startup

Pythonista. Messi Maniac. Data Engineer. ML Practitioner. AI Enthusiasts.