Stress-free DAGs: Our Experience of Using Metadata with Apache Airflow

Andrei Larionov
ADEO Tech Blog
Published in
8 min readOct 28, 2021

My name is Andrey. I’m a Data Engineer working at Leroy Merlin Russia. Our company is leveraging IT to build an innovative customer experience and it is impossible without data and everything related to it (ML, AI, DataMining, etc). To make decisions based on data we need tools for it — the Data Platform with the unified Data Warehouse. The main goal of my team is to provide those tools for product teams to integrate with our unified DWH. Such tasks require a lot of agility: we have more than 100 product teams that have different needs and use cases.

Here are just a few examples of how using the unified DWH helps the product team. We develop data-products, which increase revenue or savings and offer the best service to our customers. One particular example — Complementary products on the website. We can determine which products are complements to each other using the data from the DWH and ML algorithms. The products are then displayed side by side on the web page for the benefit of the customer, so next time you decide to purchase a new door, our algorithm will ensure you don’t forget to also buy the door handle. And this is just one example, so far, we have launched more than 15 data-products.

Another popular use case is the report and dashboard development. We regularly monitor our metrics and adjust our goals based on the data. All this requires a complex infrastructure, which you can find below. In this article I’m going to tell you about part of it: how we take advantage of Airflow to build and manage the Data Pipeline. And, maybe next time we’ll share with you how we built a whole data platform for more than 2,000 users.

This is how our Data architecture looks like

About Apache Airflow

Apache Airflow is a simple and convenient batch-oriented tool for developing, planning and monitoring data pipelines. Its key feature is that using Python code and built-in function blocks, you can connect many different technologies used in the modern world. The main working entity of Airflow — DAG — directed acyclic graph, where tasks are nodes, and dependencies between tasks are represented by directed edges.

Those who use Apache Airflow to orchestrate data loading tasks into DWH will probably appreciate the flexibility it provides for solving typical tasks. When the whole development process comes down to filling in a configuration file with a description of the DAG (directed acyclic graph) parameters and a list of tasks to be performed. At Leroy Merlin Russia, this approach is successfully used to create tasks for transferring data from the raw data layer to the ODS layer. Therefore, it was decided to extend it to the tasks of filling data marts.

Data Mart layer — is the most important layer of DWH for business users because it’s very convenient to get data for their regular reports or ad-hoc queries. But for data engineers, it’s the last stage of data pipeline development. As a result, business users sometimes have to wait too long. So we decided to help them and began to develop a new framework, which must reduce their waiting period.

The main difficulty was that we do not yet have a unified methodology for developing data marts and procedures for filling them out. And each developer solved the problem based on their personal preferences and experience. This fits into one of the main corporate IT principles — “You build it — you run it”, which means that the developer is responsible for his decision and supports it himself. This situation allows us to quickly release experiments in production. For maintenance purposes, we are now looking for a common practice.

Problems we faced

It is worth telling here how the development for loading data marts was carried out before. The engineer develops GreenPlum’s functions, develops DAGs (directed acyclic graph) for their launch, then creates a new repository on GitHub from the template, uploads the code of his DAGs (directed acyclic graph) and adds his repository to the main Airflow project as a submodule. With this approach, the face some difficulties:

  1. Need to dive into Python and Apache Airflow;
  2. At the start of development, the main project was released once a week, so to see your DAGs on Airflow production, you had to wait;
  3. The main project grew gradually and began to slow down during deployment;
  4. DAGs code scattered across different repositories performing similar tasks is difficult to manage;
  5. The lack of a unified approach also affected the quality of the SQL code of the procedures. It was often possible to come across complex logic for managing boot parameters, which could easily be “outweighed” on Airflow.

All of the above led us to the idea that it is time to take control of the situation and start developing a standard solution. We found out that most of the existing DAG’s are very simple, do not contain complex dependencies and consist mainly of DummyOperators and PostgresOperators. This served as the starting point for the development of a new tool, which, in turn, had to:

  1. To be able to create DAGs based on a configuration file in YAML format, which would indicate the main parameters, such as: start date, schedule, parameters for connecting to the database, names of procedures to run, their parameters, etc. YAML files should be stored inside the corporate service for managing metadata, you can get their content through the API;
  2. Be as simple as possible, have clear documentation so that the immersion does not take much time;
  3. At the same time, be as flexible as possible, be able to work with the maximum possible number of DAG settings in Airflow.

How we implement

The result is approximately the following template for the configuration file:

module_name: marts-loadpool: marts_poolqueue: marts-loadowner: Andy Larpostgres_conn_id: marts_db_roleemail:  - Andy.Lar@marts-load.rutags:  - marts-loadaccess_control:  - marts-load-roleschedule_interval: "@daily"start_date: "2021–01–01"catchup: trueschema_name: martstask_list:  - task_name: Starttask_type: dummy  - task_name: my_ordinary_mart_loadtask_schema_name: martstask_type: ordinary_loadprocedure_name: load_my_ordinary_martparams:period_start: "{{ ds }}"period_end: "{{ next_ds }}"task_depends_on:  - Start  - task_name: my_multiple_schemas_loadtask_type: multiply_loadprocedure_name: load_my_multiple_schemastask_multiply: schematask_multiply_list:  - schema_1  - schema_2  - schema_3task_depends_on:  - my_ordinary_mart_load  - task_name: Intermediatetask_type: dummytask_depends_on:  - my_multiple_schemas_load  - task_name: my_multiple_params_loadtask_type: multiply_loadprocedure_name: load_my_multiple_paramstask_multiply: paramsparams:period: task_multiply_listtask_multiply_list:  - "day"  - "month"  - "year"task_depends_on:  - Intermediate  - task_name: Finishtask_type: dummytask_depends_on:  - my_multiple_params_load

From which this DAG is created:

Description of parameters

Common parameters:

  • module_name — needed to form DAG_ID;
  • pool — the pool in which the tasks will be launched;
  • queue — a queue for tasks;
  • owner — the owner of the DAG;
  • postgres_conn_id — DB connection string;
  • email — list of emails for sending alerts;
  • tags — a list of tags for finding a DAG in the UI;
  • access_control: role to control the DAG;
  • schedule_interval — schedule for launching the DAG;
  • start_date and catchup are parameters that control the depth of the download history. Airflow uses an interval approach. This means that the time period from start_date to the optional end_date (we are not using it) is split into the intervals specified in schedule_interval. If catchup is True, then DAGA will start from start_date, if False, then from the current interval;
  • schema_name — database schema where the storefront is located;
  • task_list — list of tasks in the DAG.

The main parameters of the tasks:

  • task_name — matches the task_id of Airflow
  • task_type — task type
  • task_schema_name — the database schema in which the storefront is located, if the schema differs from the general one
  • task_conn_id — connection string, if different from the general one
  • procedure_name — showcase loading procedure
  • params — list of procedure parameters and their values
  • task_depends_on — the list of tasks on which the launch of this task depends
  • priority_weight — the priority of this task in relation to other tasks
  • task_concurrency — the number of simultaneously running task instances in all running DAG instances

There are currently three types of tasks (task_type):

1) Dummy — corresponds to DummyOperator. A task that does nothing and usually serves as a start and end task, and for dividing tasks into blocks.

def dummy_task_processing(tasks_ids, **task_params):    dummy_task = DummyOperator(    task_id=task_params[“task_name”],    pool=task_params[“pool”],    dag=task_params[“dag”],    priority_weight=task_params[“priority_weight”],    task_concurrency=task_params[“task_concurrency”],)    return tasks_ids.update({task_params[“task_name”]:   [dummy_task.task_id]})

2) Ordinary loading — corresponds to PostgresOperator in Airflow

def ordinary_load_task_processing(tasks_ids, **task_params):    load_task = PostgresOperator(    task_id=task_params["task_name"],    sql=f"{task_params['search_path']}; SELECT  {task_params['schema_name']}.{task_params['procedure_name']}({task_params['params']});",    postgres_conn_id=task_params["conn_id"],    pool=task_params["pool"],    dag=task_params["dag"],    priority_weight=task_params["priority_weight"],    task_concurrency=task_params["task_concurrency"],    )    return tasks_ids.update({task_params["task_name"]:   [load_task.task_id]})

This is how the SQL code that this task generates looks like:

3) Multiple loading — many PostgresOperator (if you need to create a bunch of similar tasks that differ in one parameter)

def multiply_load_task_processing(tasks_ids, **task_params):    multiplied_tasks_ids = []    nonalphanum = r"\W "    for multiplier in task_params["task_multiply_list"]:        load_task = PostgresOperator(        task_id=f”{task_params['task_name']}_{re.sub(nonalphanum,  ‘’, multiplier)}”,        postgres_conn_id=task_params["conn_id"],        pool=task_params["pool"],        sql="",        dag=task_params["dag"],        priority_weight=task_params["priority_weight"],        task_concurrency=task_params["task_concurrency"],       )       if task_params["task_multiplier"] == "schema":       load_task.sql = f"{task_params['search_path']}, {multiplier};   SELECT {task_params['schema_name']}.{task_params['procedure_name']}({task_params['params']});"       elif task_params["task_multiplier"] == "params":       load_task.sql = f"{task_params['search_path']}; SELECT   {task_params['schema_name']}.{task_params['procedure_name']}  ({task_params['params'].replace('task_multiply_list',    multiplier)});"       multiplied_tasks_ids.append(load_task.task_id)       return tasks_ids.update({task_params["task_name"]:  multiplied_tasks_ids})

This type has its own specific parameters:

  • task_multiply — can have 2 values: “schema” or “params”. If schema is specified, the values ​​from task_multiply_list are added to the SEARCH_PATH expression. If “params”, then the values ​​from task_multiply_list are added to the list of procedure parameters for the parameter from the params list, whose value is specified as ‘task_multiply_list’
  • task_multiply_list — a list of values ​​for a parameter by which tasks of the same type will be created

The result is this SQL code.

For “schema”:

For “params”:

And this is how the dependencies between the tasks are established:

def set_dependencies(task_id, dependencies, tasks_ids, dag):    downstream_task_id = task_id    for upstream_task_name in dependencies:        if tasks_ids[upstream_task_name]:            for upstream_task_id in tasks_ids[upstream_task_name]:                dag.set_dependency(upstream_task_id, downstream_task_id)else:    raise AirflowException(        f"Incorrect or undefined dependency {upstream_task_name} for       task {task_id}")

Next Steps

The implementation of the tool has significantly reduced the time for the development of DAGs. You no longer need to dive deeply into Apache Airflow, although you still have to read about macros and scheduling.

The configuration file template is completed in 10–15 minutes. The time spent on reviewing and deploying for production has also been greatly reduced. However, this is where the main area for development lies: now the review and deployment are taking place in manual mode. I would like to impose all this with tests and give the developer the opportunity to send their own DAGs to production.

Editor’s Note : Thanks for reading!

Glad to have LMRU’s Data team publishing here.

Make sure to subscribe to get notified when the next articles are released!

--

--