ETL-пайплайны на Airflow: как наш опыт внедрения и использования помог бизнесу
Я Айдын Исатаев — data-инженер в отделе аналитики Kolesa Group. Отвечаю за получение и хранение данных по всем продуктам компании: Kolesa.kz, Krisha.kz и Avtoelon.uz. Разрабатываю ETL-процессы, развиваю хранилище данных и занимаюсь интеграцией информации с другими командами. Моя основная задача — предоставлять качественные и готовые данные для бизнеса.
В этой статье вы узнаете:
- Что было на старте
- Какие были проблемы
- Что такое Apache Airflow
- Внедрение Airflow в наш контур DWH
- Мониторинг
- Что это дало бизнесу
Мы в Kolesa Group всегда работали, ориентируясь на данные. И чтобы управлять ими было проще и эффективнее, перешли на Apache Airflow в качестве ETL-инструмента. Приведу наш опыт внедрения и использования Airflow, позволяющий проектировать и легко масштабировать Data Pipelines — конвейеры данных.
Что было на старте
В 2017 году, самом начале пути становления отдела аналитики в Kolesa Group, в качестве облачного хранилища данных DWH/Data Lake использовали Google BigQuery. Аналитики сами загружали данные в BigQuery, в основном это были разовые выгрузки. Если возникала необходимость постоянного обновления данных, то каждый аналитик сам занимался этим и сам выбирал способ реализации. Подключались Python-разработчики, инженеры, которые внедрили оркестратор luigi, cronjob и scheduled queries — запланированные запросы в BigQuery (ETL).
По мере роста команды и данных появилась потребность в команде Data-инженеров, чтобы:
1. Построить необходимую архитектуру DWH-аналитики.
2. Выстроить Data Pipelines.
3. Настроить мониторинг.
На момент запуска проекта по внедрению Apache Airflow, у нас было:
• 4 отдельных репозитория проектов, в каждом из которых работали свои ETL-процессы;
• в каждом проекте использовались разные подходы к построению pipelines (luigi, cronjobs);
• запуск обновления дашбордов и метрик вручную.
Какие были проблемы
1. Неоптимальное расходование ресурсов железа компании. К примеру, многие Data Pipelines дублировали запросы в источниках, был неоптимальный простой ресурсов.
2. Не было единой точки мониторинга. Не могли быстро реагировать на ошибки и проблемы в работе с загрузкой данных в BigQuery.
3. Увеличение количества извлекаемых данных в DWH и, как следствие, увеличение числа пайплайнов.
4. Управлять пайплайнами загрузки данных в DWH стало сложнее.
5. Ручных загрузок и работы с данными стало слишком много.
Решили перейти на современный и унифицированный подход для реализации Data Pipelines. Самым подходящим инструментом для этого виделся Apache Airflow. Причины:
• open source;
• имеет масштабируемый scheduler;
• большое и активное сообщество;
• UI-дашборд с подробными данными обо всех сущностях.
Что такое Apache Airflow
Airflow — это платформа для создания, мониторинга и оркестрации пайплайнов. Этот опенсорс-проект, написанный на Python. В мире обработки данных некоторые называют его ETL-инструментом, но в сущности это не совсем так. Airflow — это оркестратор, своего рода product-менеджер: он не сам выполняет тяжелую работу по перекладке и обработке данных, а говорит другим компонентам системы и фреймворкам, что надо делать. И следит за статусом выполнения тех или иных задач по работе с данными. Но ничего не мешает обрабатывать данные на самом Airflow, выкачивая их через worker, что в итоге мы и сделали.
Спектр задач, решаемых с помощью Airflow, не ограничивается запуском чего-то в каком-нибудь кластере. Он может запускать Python-код, выполнять Bash-команды, поднимать Docker-контейнеры и поды в Kubernetes, выполнять запросы в вашей любимой базе данных и многое другое.
Внедрение Airflow в наш контур DWH
Первым шагом была установка на сервере Apache Airflow версии 2.5.0. С развертыванием и конфигурированием нам помогли наши системные администраторы.
В результате получилась примерно следующая архитектура:
Описание компонентов:
• Metadata DB — Airflow использует базу данных SQL для хранения метаданных о запускаемых пайплайнах данных.
• Scheduler — та часть, которая управляет ресурсами в Kubernetes для создания, запуска или удаления подов с воркерами.
• Webserver — предоставляет web UI для мониторинга, аудита и ручной оркестрации пайплайнов / воркеров.
• Worker — в этом контексте временная сущность, которая непрерывно создаётся или удаляется scheduler-ом для исполнения пайплайнов обработки.
Далее расмотрим сущности Apache Airflow, такие как pipeline, или DAG. Самая важная сущность Airflow — это DAG, он же пайплайн, он же направленный ациклический граф. Пример:
Допустим, к нам пришел аналитик и попросил раз в день загружать данные в определенную таблицу. Он подготовил всю информацию: что откуда нужно брать, когда нужно запускаться, с каким SLA.
Так мы описывали наш пайплайн в Airflow:
Следующая сущность — PythonOperator. Он описывает, какие действия надо совершить в рамках ежедневной задачи, чтобы порадовать аналитика. PythonOperator создан для того, чтобы вызывать функцию, написанную в этом DAG. Для запуска оператора нужно указать название задачи, пайплайн, идентификатор функции и передать параметры:
Мониторинг
В telegram-чате реализовали Аlert-систему , куда приходят сообщения о поломке tasks:
Для этого использовали telegram-оператор из стандартной библиотеки Airflow в Python:
Итоги
Так выглядит web-интерфейс, показывающий, что сейчас происходит с pipeline. Эту страницу видит data-инженер:
Мы используем Apache Airflow так — собираем данные из различных источников (множество инстансов SQL Server и MongoDB, разные API с метриками приложений, даже 1С) в Data Lake (у нас это BigQuery).
Наши потребности покрывает один сервер, разделённый на 2 namespace (testing и productions) на 64 ядрах и 64 GB оперативки в каждом из этих namespace. В Airflow при этом работает:
- более 150 дагов (собственно workflows, в которые мы набили задачки);
- в каждом, в среднем, по 10 тасков;
- запускается это добро тоже, в среднем, раз в час.
Что это дало бизнесу
После внедрения Apache Airflow в наш аналитический контур DWH, мы смогли существенно снизить:
1. Ежедневные затраты на вычисления в облачном хранилище BigQuery примерно в два раза, с 50 Тб на 25 Тб.
2. Количество поломок пайплайнов загрузки данных с 1–2 раз ежедневно, на 1–2 в неделю.
3. Количество выделяемых ресурсов железа компании, с 3 отдельных сервера на 1.
4. Безболезненное увеличение количества пайплайнов до 170 на текущий момент и их сложность
С Airflow смогли реализовать полностью автоматизированный dataflow — теперь можем быстро обновлять дашборды на готовых агрегированных данных без участия аналитиков и data-инженеров.
Всё это дало нам и нашему бизнесу системный подход в задачах по обработке данных, их агрегацию и трансформацию. Высвободили время аналитиков от рутиной работы с данными, а также получили быстрое и качественное добавление дашбордов и метрик для нужд бизнеса.