Pipeline symphony: You can use graphs to orchestrate ETL

How to orchestrate Azkaban pipelines using the power of graphs

Amir Zareian
The Telegraph Engineering

--

In The Telegraph Data Team we build ETL (Extraction, Transform, Loading) pipelines. ETL refers to a process in database usage, particularly in data warehousing. Data extraction is the process where data is extracted from data sources; data transformation is where the data is transformed to store in the proper format or structure for the purposes of querying and analysis; and data loading is where the data is loaded into the final target database.

In some cases, many of these pipelines need to work coherently. To ensure that they do, you need a conductor that acts as the central brain and includes all of the rules; it checks the status of the flow of actions, and based on the rules, it decides which action to take as the next step. We call this brain the “orchestrator”.

In our Data Team, we use Azakban as a scheduler to run our pipelines. To make it a fully-fledged orchestrator, a python script is written to run individual pipelines and hard code the logic and all the rules. It needs many lines of code to define a simple logic; this process can be time consuming and inefficient.

To address this problem, we use graphs instead of using code to define the rules:

result_a = run(a)
result_b = run(b)
if result_a and result_b:
run(c)

The idea is to build a library which takes the graph as the orchestration rules and takes care of running the pipelines.

we use dot notation to define the graph:

A -> C
B -> C

Sometimes we need to group pipelines and run all the pipelines within the group in parallel:

Group_A: A, B
Group_A -> C


Pipelines can run with parameters. They can also have hard and soft dependencies:

A(date) -> C
B .> C

Orchestration works as follows:

Install the python library using pip:

pip install azkaban-orchestrator

Create a diagram file and define the dependencies between pipelines using the notation described earlier. Configure the Azkaban client and run it!

import logging
from azkaban_orchestrator import orchestrator

client = orchestrator.Client(
diagram_file_name='/path/to/diargam_file',
host='azkaban_host',
username='azkaban_username',
password='azkaban_passwpord',
logger=logging.getLogger(__name__)
)

# define the parameters need to pass to the orchestrator
params = {'date':'20171202'}

# define the initial pipeline
# if you need to start orchestration from a specific pipeline
initial = None

client.run(initial, params)

The library is available at https://github.com/telegraph/azkaban-orchestrator

Happy Orchestration!

--

--