ETL-пайплайны на Airflow: как наш опыт внедрения и использования помог бизнесу

Aidyn Issatayev
Kolesa Group
Published in
5 min readMay 11, 2023

Я Айдын Исатаев — data-инженер в отделе аналитики Kolesa Group. Отвечаю за получение и хранение данных по всем продуктам компании: Kolesa.kz, Krisha.kz и Avtoelon.uz. Разрабатываю ETL-процессы, развиваю хранилище данных и занимаюсь интеграцией информации с другими командами. Моя основная задача — предоставлять качественные и готовые данные для бизнеса.

В этой статье вы узнаете:

- Что было на старте
- Какие были проблемы
- Что такое Apache Airflow
- Внедрение Airflow в наш контур DWH
- Мониторинг
- Что это дало бизнесу

Мы в Kolesa Group всегда работали, ориентируясь на данные. И чтобы управлять ими было проще и эффективнее, перешли на Apache Airflow в качестве ETL-инструмента. Приведу наш опыт внедрения и использования Airflow, позволяющий проектировать и легко масштабировать Data Pipelines — конвейеры данных.

Айдын Исатаев

Что было на старте

В 2017 году, самом начале пути становления отдела аналитики в Kolesa Group, в качестве облачного хранилища данных DWH/Data Lake использовали Google BigQuery. Аналитики сами загружали данные в BigQuery, в основном это были разовые выгрузки. Если возникала необходимость постоянного обновления данных, то каждый аналитик сам занимался этим и сам выбирал способ реализации. Подключались Python-разработчики, инженеры, которые внедрили оркестратор luigi, cronjob и scheduled queries — запланированные запросы в BigQuery (ETL).

По мере роста команды и данных появилась потребность в команде Data-инженеров, чтобы:
1. Построить необходимую архитектуру DWH-аналитики.
2. Выстроить Data Pipelines.
3. Настроить мониторинг.

На момент запуска проекта по внедрению Apache Airflow, у нас было:

• 4 отдельных репозитория проектов, в каждом из которых работали свои ETL-процессы;
• в каждом проекте использовались разные подходы к построению pipelines (luigi, cronjobs);
• запуск обновления дашбордов и метрик вручную.

Какие были проблемы

1. Неоптимальное расходование ресурсов железа компании. К примеру, многие Data Pipelines дублировали запросы в источниках, был неоптимальный простой ресурсов.

2. Не было единой точки мониторинга. Не могли быстро реагировать на ошибки и проблемы в работе с загрузкой данных в BigQuery.

3. Увеличение количества извлекаемых данных в DWH и, как следствие, увеличение числа пайплайнов.

4. Управлять пайплайнами загрузки данных в DWH стало сложнее.

5. Ручных загрузок и работы с данными стало слишком много.

Решили перейти на современный и унифицированный подход для реализации Data Pipelines. Самым подходящим инструментом для этого виделся Apache Airflow. Причины:

• open source;
• имеет масштабируемый scheduler;
• большое и активное сообщество;
• UI-дашборд с подробными данными обо всех сущностях.

Что такое Apache Airflow

Airflow — это платформа для создания, мониторинга и оркестрации пайплайнов. Этот опенсорс-проект, написанный на Python. В мире обработки данных некоторые называют его ETL-инструментом, но в сущности это не совсем так. Airflow — это оркестратор, своего рода product-менеджер: он не сам выполняет тяжелую работу по перекладке и обработке данных, а говорит другим компонентам системы и фреймворкам, что надо делать. И следит за статусом выполнения тех или иных задач по работе с данными. Но ничего не мешает обрабатывать данные на самом Airflow, выкачивая их через worker, что в итоге мы и сделали.

Спектр задач, решаемых с помощью Airflow, не ограничивается запуском чего-то в каком-нибудь кластере. Он может запускать Python-код, выполнять Bash-команды, поднимать Docker-контейнеры и поды в Kubernetes, выполнять запросы в вашей любимой базе данных и многое другое.

Внедрение Airflow в наш контур DWH

Первым шагом была установка на сервере Apache Airflow версии 2.5.0. С развертыванием и конфигурированием нам помогли наши системные администраторы.

В результате получилась примерно следующая архитектура:

Описание компонентов:

• Metadata DB — Airflow использует базу данных SQL для хранения метаданных о запускаемых пайплайнах данных.

• Scheduler — та часть, которая управляет ресурсами в Kubernetes для создания, запуска или удаления подов с воркерами.

• Webserver — предоставляет web UI для мониторинга, аудита и ручной оркестрации пайплайнов / воркеров.

• Worker — в этом контексте временная сущность, которая непрерывно создаётся или удаляется scheduler-ом для исполнения пайплайнов обработки.

Далее расмотрим сущности Apache Airflow, такие как pipeline, или DAG. Самая важная сущность Airflow — это DAG, он же пайплайн, он же направленный ациклический граф. Пример:

Допустим, к нам пришел аналитик и попросил раз в день загружать данные в определенную таблицу. Он подготовил всю информацию: что откуда нужно брать, когда нужно запускаться, с каким SLA.

Так мы описывали наш пайплайн в Airflow:

Следующая сущность — PythonOperator. Он описывает, какие действия надо совершить в рамках ежедневной задачи, чтобы порадовать аналитика. PythonOperator создан для того, чтобы вызывать функцию, написанную в этом DAG. Для запуска оператора нужно указать название задачи, пайплайн, идентификатор функции и передать параметры:

Мониторинг

В telegram-чате реализовали Аlert-систему , куда приходят сообщения о поломке tasks:

Для этого использовали telegram-оператор из стандартной библиотеки Airflow в Python:

Итоги

Так выглядит web-интерфейс, показывающий, что сейчас происходит с pipeline. Эту страницу видит data-инженер:

Мы используем Apache Airflow так — собираем данные из различных источников (множество инстансов SQL Server и MongoDB, разные API с метриками приложений, даже 1С) в Data Lake (у нас это BigQuery).

Наши потребности покрывает один сервер, разделённый на 2 namespace (testing и productions) на 64 ядрах и 64 GB оперативки в каждом из этих namespace. В Airflow при этом работает:
- более 150 дагов (собственно workflows, в которые мы набили задачки);
- в каждом, в среднем, по 10 тасков;
- запускается это добро тоже, в среднем, раз в час.

Что это дало бизнесу

После внедрения Apache Airflow в наш аналитический контур DWH, мы смогли существенно снизить:

1. Ежедневные затраты на вычисления в облачном хранилище BigQuery примерно в два раза, с 50 Тб на 25 Тб.

2. Количество поломок пайплайнов загрузки данных с 1–2 раз ежедневно, на 1–2 в неделю.

3. Количество выделяемых ресурсов железа компании, с 3 отдельных сервера на 1.

4. Безболезненное увеличение количества пайплайнов до 170 на текущий момент и их сложность

С Airflow смогли реализовать полностью автоматизированный dataflow — теперь можем быстро обновлять дашборды на готовых агрегированных данных без участия аналитиков и data-инженеров.

Всё это дало нам и нашему бизнесу системный подход в задачах по обработке данных, их агрегацию и трансформацию. Высвободили время аналитиков от рутиной работы с данными, а также получили быстрое и качественное добавление дашбордов и метрик для нужд бизнеса.

--

--