Automate, Schedule and Monitor ETL Workflows through Airflow, Selenium and update DAG’s through Webhook

Kashish Bakshi
Arthashastra Intelligence
12 min readAug 5, 2020

This post helps build an Airflow plugin, which uses the Selenium’s Chrome Webdriver in Headless mode, to automate monthly tasks of fetching RBI Data and eventually loading it into Elasticsearch. We will even set up a Webhook that automatically updates the DAG via Git Push to our Remote Airflow Server Setup.

On the off chance that your day by day task includes the web, at that point utilizing Selenium on Airflow, might spare several hours and improve the quality and consistency of your work.

Prerequisites

ETL Pipeline

ETL stands for Extract-Transform-Load. ETL isn’t just for Data Warehousing! ETL works on almost any type of data conversion. You read the source, translate the data for your target, and store the result.

By dividing a conversion into 3 steps, we isolate the input from the output…

  • Centralizes data formatting and validation.
  • Makes new input formats a breeze.
  • Makes new outputs just as easy.

ETL Pipeline takes your data files from extract to load. It reads an input source, translates the data, and writes it to an output destination.

Selenium

Basically, Selenium automates programs. Fundamentally it is utilized to automate web applications for testing purposes but it isn’t restricted to that by any means. A key segment of selenium is the WebDriver, the WebDriver API sends commands directly to the Browser. The WebDriver utilized in this post will be the Selenium/standalone-chrome driver.

Airflow

Apache Airflow is a workflow automation and scheduling system that can be used to author and manage data pipelines. Airflow uses workflows made of directed acyclic graphs (DAGs) of tasks. The key components of Airflow are the webserver, scheduler, and workers. The web server refers to the Airflow user interface, while the scheduler executes your tasks on an array of workers as per predefined instructions.

Docker

Docker is a tool designed to make it easier to create, deploy, and run applications by using containers. Containers allow a developer to package up an application with all of the parts it needs, such as libraries and other dependencies, and deploy it as one package. A great feature of Docker is the compose tool which is used to define and run multi-container Docker applications.

— — —

Preparing the base Environment

This will be our Base Github Repository, our point of reference for Airflow Setup. We will use the docker-compose file created by a Github user named Puckel which allows you to quickly get up and running with Airflow.

Test that the environment runs locally using the docker-compose up command, the UI should be available at http://localhost:8080.

We will use docker-compose-CeleryExecutor.yml file which uses Celery executor that will configure our tasks to run concurrently as it scales out the number of workers.

Let's have a glance at the file.

As you can see, we will run a total of 6 Services in order to set up our Airflow Environment.

  1. Redis: For Caching Logs
  2. Postgres: Our Persistent Database for Airflow
  3. Webserver: Airflow’s Dashboard
  4. Flower: For Monitoring and administrating Celery Clusters
  5. Scheduler: DAG Scheduler for Airflow
  6. Worker: Task executor as per pre-defined Instructions

# NOTE :

Fernet Key :

A Fernet key is required in order to encrypt password within connections. The Fernet key must be a base64-encoded 32-byte key.

Roping in Airflow Plugin

A great feature of Airflow is the plugins, an easy way to extend the existing feature set of Airflow. To integrate a new plugin with the existing airflow environment, simply move the plugin files into the plugins folder.

If you want to know more about the Plugin Directory Structure, refer to the README File of the following Github Repo.

We will use Selenium Plugin and modify our Environment Accordingly. Let’s just start with what our Selenium Plugin will do.

Our Selenium plugin will work as follows:

  1. Start the Selenium Docker container in the host environment.
  2. Configure the remote Selenium WebDriver on the docker container.
  3. Send commands to the WebDriver to fulfill the task.
  4. Stop and remove the container.

Binding Docker Socket :

The Airflow worker needs to be able to create the Selenium container and subsequently send commands to execute the task. The easiest way to allow the worker to create containers is by exposing the host Docker socket to the worker by mounting it as a volume in the docker-compose file.

worker:
volumes:
- /var/run/docker.sock:/var/run/docker.sock

Set Permissions :

This can be achieved by creating a new Dockerfile called ‘Dockerfile-airflow’ which extends the puckel/docker-airflow base image as follows:

FROM puckel/docker-airflow:1.10.4USER root
RUN groupadd --gid 999 docker \
&& usermod -aG docker airflow
USER airflow
  • The Dockerfile first calls the puckel/docker-airflow base image
  • As the root user, creates the docker user group with the id 999 and adds the airflow user to the group.
  • Sets the airflow user.

For deep dive into User Permissions in Linux, I will recommend this Playlist.

Create the new docker image:

docker build -t docker_airflow:modified -f Dockerfile-airflow .

The Airflow worker can now create Docker containers on the host, however still requires the Docker python package. The installations below are necessary for the Selenium plugin, DAG, and our RBI ETL Script.

RUN pip3 install selenium && \    
pip3 install bs4 && \
pip3 install elasticsearch && \
pip3 install lxml

Interacting with the remote container

The Selenium plugin sends commands to the docker container via the remote driver. The Selenium commands will be added to the environment as a mounted volume in the home directory.

volumes:
# Selenium scripts
- ./selenium_scripts:/usr/local/airflow/selenium_scripts

The commands will come from a custom Python module (selenium_scripts) which needs to be in the Python Path. This can be done in the Airflow Dockerfile.

ENV PYTHONPATH=$PYTHONPATH:${AIRFLOW_USER_HOME}

The last change to the Airflow environment is to enable the Selenium container and Airflow workers to share files and content. This is required when downloading content from the internet with Selenium and can be achieved with an external named volume.

docker volume create downloads

The environment is now complete.

DockerFile for Airflow

Dockerfile Selenium

Updated Docker-compose File:

Let’s look at the Changes that we Made in the Base Puckel File:

  1. We changed the Versions of some of the Docker Images to the latest.
  2. Gave the Webserver a Hostname of airflow.aidatabases.in.
  3. In the Volume’s Section, Mapped Absolute Paths of my Working Directory.

Step by Step Set-Up:

  • Spawn Amazon EC-2 Instance with a minimum of 8 GiB of memory and 2 vCPU’s.
  • SSH into the EC-2 Instance and Install Docker and Docker-Compose.
  • Create a Dockerfile-airflow and build it as mentioned in the earlier Section of the Post.
docker build -t docker_airflow:modified -f Dockerfile-airflow .

# NOTE :

If you encounter this Error while Docker build Process

Permissions Error

Then in the DockerFile for Airflow, Add root Privileges.

After Build is Successfully executed,

  • Type
Docker Images
  • You will see docker_airflow:modified Image.
  • Clone your Github Airflow Repo. You can fork mine and clone to start with.
  • In order to clone, you will require to generate an SSH Key on your VM and add the generated .pub file into the Deploy Key Section of your GitHub Repository.
  • Once done, you can clone the repo using git clone command.
  • Create Docker-compose.yaml file and copy the contents of the Updated Docker-compose File.
  • Change the Absolute path with your Present Working Directory Path.
  • Run
docker-compose up -d

You should see an output like this after a successful run.

successful Run
  • Run
docker-compose ps
  • If you get an output like below, that means your Services are Healthy, active, and running. If not then check the service Logs.

Now, I have mapped my Sub-Domain to the EC2-Instance and hence I can access my Airflow Webserver at airflow.aidatabases.in.

Setting up WebHook

Webhook:

GitHub lets you configure webhooks for your repositories, which are events that send HTTP requests when events happen. For example, you can use a webhook to notify you when someone creates a pull request or pushes new code.

Setting up WebHook

Sign in to your GitHub account and navigate to the repository you wish to monitor. Click on the Settings tab in the top menu bar on your repository’s page, then click Webhooks in the left navigation menu. Click Add Webhook in the right corner and enter your account password if prompted. You’ll see a page that looks like this:

  • In the Payload URL field, enter http://your_server_ip:8080. This is the address and port of the Node.js server we’ll write soon.
  • Change the Content type to application/json. The script we will write will expect JSON data and won’t be able to understand other data types.
  • For Secret, enter a secret password for this webhook. You’ll use this secret in your Node.js server to validate requests and make sure they came from GitHub.
  • For Which events would you like to trigger this webhook, select just the push event. We only need the push event since that is when code is updated and needs to be synced to our server.
  • Select the Active checkbox.
  • Review the fields and click Add webhook to create it.

Create SSH Key and Clone Github Repo into your Server

Lets first generate an SSH Key using ssh-keygen and add the public key generated in our Github Repo in the Deploy Keys Section.

Refer this.

You’ll see a page that looks like this:

My Deploy Key

Now Clone your Repo in the Server.
Ensure that while cloning, you are the ROOT user.

Creating the Webhook Script

Note: You must have the latest version of Node.js installed.

We’ll write a Node.js script that launches a web server on port 8080. The server will listen for requests from the webhook, verify the secret we specified, and pull the latest version of the code from GitHub.

Navigate to your home directory:

cd ~

Create a new directory for your webhook script called Webhook:

mkdir ~/Webhook

Then navigate to the new directory:

cd ~/Webhook

Create a new file called webhook.js inside of the Webhook directory.

sudo vi webhook.js

Add these two lines to the script webhook.js:

var secret = "your_secret_here";
var repo = "~/your_repo_path_here/";

The first line defines a variable to hold the secret you created which verifies that requests come from GitHub. The second line defines a variable that holds the full path to the repository you want to update on your local disk.

Next, add these lines which import the http and crypto libraries into the script. We’ll use these to create our web server and hash the secret so we can compare it with what we receive from GitHub:

let http = require('http');
let crypto = require('crypto');

Next, include the child_process library so you can execute shell commands from your script:

const exec = require('child_process').exec;

Next, add this code to define a new web server that handles GitHub webhook requests and pulls down the new version of the code if it’s an authentic request:

http.createServer(function (req, res) {
req.on('data', function(chunk) {
let sig = "sha1=" + crypto.createHmac('sha1', secret).update(chunk.toString()).digest('hex');
if (req.headers['x-hub-signature'] == sig) {
exec('cd ' + repo + ' && git pull');
}
});
res.end();
}).listen(8080);

The http.createServer() function starts a web server on port 8080 which listens for incoming requests from Github. For security purposes, we validate that the secret included in the request matches the one we specified when creating the webhook . The secret is passed in the x-hub-signature header as an SHA1-hashed string, so we hash our secret and compare it to what GitHub sends us.

If the request is authentic, we execute a shell command to update our local repository using git pull.

The completed script looks like this:

const secret = "your_secret_here";
const repo = "~/your_repo_path_here/";
const http = require('http');
const crypto = require('crypto');
const exec = require('child_process').exec;
http.createServer(function (req, res) {
req.on('data', function(chunk) {
let sig = "sha1=" + crypto.createHmac('sha1', secret).update(chunk.toString()).digest('hex');
if (req.headers['x-hub-signature'] == sig) {
exec('cd ' + repo + ' && git pull');
}
});
res.end();
}).listen(8080);

Installing the Webhook as a Systemd Service

systemd is the task manager Ubuntu uses to control services. We will set up a service that will allow us to start our webhook script at boot and use systemd commands to manage it like we would with any other service.

Start by creating a new service file:

sudo nano /etc/systemd/system/webhook.service

Add the following configuration to the service file.

[Unit]
Description=Webhook
After=network.target
[Service]
Environment=NODE_PORT=8080
Type=simple
User=root
ExecStart=/usr/bin/nodejs /home/ubuntu/Webhook/webhook.js
Restart=on-failure
[Install]
WantedBy=multi-user.target

Enable the new service so it starts when the system boots:

sudo systemctl enable webhook.service

Now start the service:

sudo systemctl start webhook

Ensure the service is started:

sudo systemctl status webhook

You’ll see the following output indicating that the service is active:

Output: webhook.service - Git webhook
Loaded: loaded (/etc/systemd/system/webhook.service; static; vendor preset: enabled)
Active: active (running) since Tue 2020-08-04 06:25:56 UTC; 6s ago
Main PID: 28686 (nodejs)
Tasks: 6 (limit: 4915)
CGroup: /system.slice/webhook.service
└─28686 /usr/bin/nodejs /home/ubuntu/Webhook/webhook.js

You are now able to push new commits to your repository and see the changes on your server!

A bit on DAG File :

Here, we have used airflow to automate our pipeline for data extraction, transformation and finally storing the data. For our use case, we need to execute this pipeline at periodical basis (monthly/yearly). The simplest way to achieve this would be to define a Cron job and schedule it as per the requirement. Airflow being a workflow management tool provides an elegant approach to manage and schedule our ETL pipeline.

For setting up a data pipeline using airflow, we need to be familiar with:

Operators create individual tasks that need to be performed.

Tasks define a unit of work. Each task is basically an implementation of an Operator.

Scheduler is responsible for triggering the DAG at the predefined time.

DAGs (Directed Acyclic Graphs) are used to define, order, and schedule the tasks. Tasks define the activities describing what-to-do. These tasks can be either, dependent or independent. So, basically DAG lets the scheduler to know which tasks can be executed in parallel or the order of execution in case of sequential tasks.

We will move forward with snippets to show how to define tasks and schedule them in DAG file.

  1. First, let's configure the default arguments that apply to our DAG. Here, ‘provide_context’ is set to true to enable access to the data set by other previous tasks.

DAG Object can be instantiated as below with the start date and schedule interval:

2. Next, we define the tasks for our workflow. For data pipeline, we have identified 3 tasks to extract, transform and load the data.

Data extraction using selenium web drivers has been discussed above. The results from data extraction task are captured in any desired format(Pickle format has chosen for our use case to preserve data types) and stored in a shared location.

In the next task, the data from the shared location is read and prepared for data analytics.

Finally, the processed data is stored into an ElasticSearch database.

Now, we have to create three tasks in our DAG. Task 1 is created using selenium operator because we are using selenium web drivers for data extraction. PythonOperator is used to create Task 2 and Task 3 since they have been defined as python functions. The name of the function needs to be provided as an argument to ‘python_callable’.

Dependencies between these 3 tasks can be set using ‘>>’ and ‘<<’. According to our definition: Task1 needs to be executed first, followed by Task 2 and then Task 3.

POSTSCRIPT :

I will keep refining the post as and when required.

You can expect a bit more on Selenium Plugin which I believe has not been given much its due.

Happy Learning!

--

--

Kashish Bakshi
Arthashastra Intelligence

I am becoming what i never knew that i always wanted to be.