Corriendo Apache Airflow en Lyft

Temo Ojeda
Lyft Engineering en Español
10 min readMay 27, 2020

Este artículo fue publicado originalmente el 20 de diciembre 2018 en eng.lyft.com.

Por Tao Feng, Andrew Stahlman y Junda Yang

ETL es un proceso para extraer datos de varios eventos no clasificados, transformarlos para análisis, y agregarlos a una base de datos para consulta. Los ingenieros y científicos de datos en Lyft se encargan de construir varios canales de ETL que corren en diferentes horarios para entender temas como el mercado actual de rideshare, la experiencia para conductores / pasajeros, etc. Es crucial tener un sistema de manejo de flujos de trabajo que sea fiable, eficiente y de confianza para asegurar que estos canales corran exitosamente y presentar los datos a tiempo.

Apache Airflow es un sistema de manejo de flujos de trabajo que permite a los usuarios crear, programar y monitorear los canales de datos. Lyft es la primera compañía en adoptar Airflow en producción desde que el proyecto fue liberado hace unos cinco años. Hoy, Airflow se ha convertido en una de las piezas de infraestructura más importantes en Lyft para varios casos: alimentar los tableros ejecutivos, agregar métricas, derivar generación de datos, procesamiento de funciones de machine learning, etc.

En este post vamos a compartir nuestras experiencias de cómo corremos Airflow en Lyft.

Detalles técnicos de Airflow

Para contextualizar los términos en este blog, estos son unos de los conceptos clave de Airflow:

  • DAG (grafo acíclico direccionado): un flujo de trabajo que junta todas las tareas con interdependencias.
  • Operador: el operador es una plantilla para un tipo específico de trabajo a ejecutar. Por ejemplo, el operador BashOperator representa cómo ejecutar un bash script, mientras que el operador PythonOperator representa cómo ejecutar una función de Python, entre otros.
  • Sensor: un operador especial que solo se ejecuta si una condición se cumple.
  • Tarea: una instancia parametrizada de un operador/sensor que representa una unidad de trabajo a ejecutar.
  • Plugin: una extensión que permite a los usuarios extender fácilmente Airflow con hooks, operadores, sensores, macros y vistas personalizadas.
  • Pools: configuración del límite de concurrencia para un grupo de tareas en Airflow.

Para mas términos de Airflow, puedes visitar la documentación de Airflow para más detalles.

Arquitectura de Airflow en Lyft

Esta gráfica demuestra la arquitectura de Airflow en Lyft:

Como demuestra la gráfica, hay cuatro componentes principales:

  • WebUI : el portal de usuarios para ver el status de las DAGs.
  • DB de metadatos: metastore de airflow para guardar metadatos como el status de los trabajos, las tareas, etc.
  • Planificador (Scheduler): un multi-proceso que analiza el conjunto de DAGs, crea un objeto de la DAG, y hace que el ejecutador realice las tareas.
  • Ejecutor: Un proceso de encolamiento de mensajes que organiza a los procesos para ejecutar las tareas. Hay varios ejecutores soportados por Airflow. Por ejemplo, el operador y ejecutor de Kubernetes (K8s) están desde Airflow 1.10 y nos dan soporte de ejecución nativa de Kubernetes. En Lyft, usamos el CeleryExecutor para escalar la ejecución de tareas con diferentes trabajadores (celery workers) en producción.

Aquí demostramos cómo desplegamos Airflow en producción en Lyft:

Configuración: Apache Airflow 1.8.2 con ciertos cambios (cherry-picks) y varios parches personalizados de Lyft.

Escala: Tres conjuntos de Amazon ASG (auto scaling group) para los trabajadores de Celery, cada uno asociado con una fila de Celery:

  • ASG #1: 15 nodos de trabajo, cada uno con del tipo r5.4xlarge. Esta flota de trabajadores es para procesar tareas de baja prioridad, e intensas con memoria.
  • ASG #2: 3 nodos trabajadores cada uno del tipo m4.4xlarge. Esta flota de trabajadores está dedicada a las DAGs con un SLA estricto.
  • ASG #3: 1 nodo trabajador del tipo m4.10xlarge. Este nodo es usado para procesar los trabajos de cómputo intensivo para las DAGs de un equipo crítico.

Número de DAGs / Tareas: 500+ DAGs 800+ DagRuns , 25000+ instancias de tareas corriendo en Airflow en Lyft a diario.

Monitoreo y Alertas para Airflow en Lyft

Hay más de quinientas DAGs corriendo a diario en Airflow. Es crucial mantener el SLA (service level agreement) y la disponibilidad de Airflow. En Lyft, usamos varias tecnologías como Datadog, Statsd, Grafana, y PagerDuty para monitorear Airflow.

Tablero de la salud del sistema para Airflow

Anteriormente, tuvimos un problema en producción que hizo que Airflow no programara ninguna tarea durante una hora en Lyft. No teníamos un buen sistema de monitoreo para entender si Airflow estaba programando o no tareas en ese lapso. Por ello, construimos un sistema de monitoreo de Airflow “canary” que trata a Airflow como una caja negra y verifica que programe y ejecute las tareas en un periodo de tiempo razonable. Si Airflow no programa las tareas por un tiempo (10 minutos), nuestro ingeniero en guardia inmediatamente recibirá una alerta por el problema.

Monitoreamos la salud de Airflow en tres aspectos:

Verificación de salud de la disponibilidad del agendador y de trabajadores de Airflow

Usamos una DAG de monitoreo “canary” en producción que hace:

  • Una prueba de conexión con una query de SQL sencilla (e.g. “SELECT 1” ) para todas las fuentes de datos críticas, incluyendo Redshift, Postgres, etc
  • Una revisión a nuestras filas, al programar tareas vacías a cada fila

Esta DAG “canary” ayuda al ingeniero en guardia responder las siguientes preguntas:

  • Cuánto tiempo le toma al agendador programar la tarea ( hora_de_programación — hora_actual).
  • Cuánto tiempo le toma a un trabajador de Celery empezar la tarea.
  • En cuánto tiempo corre la tarea.

Disponibilidad de UI de Airflow / servidor web

Monitoreamos el punto de llamada de salud del servidor web de Airflow y alertamos si el número de servidores sanos es menor a cierto límite.

Disponibilidad de Airflow en 7 días, 30 días, y 90 días

La disponibilidad se mide por 100% — % tiempo de inactividad. Airflow está inactivo cuando el agendador, los trabajadores o el servidor web no están disponibles.

Varias métricas para el sistema.

Otras métricas importantes a monitorear:

  • Retraso en la ejecución de tareas: esta métrica, mencionada anteriormente, nos da una mayor intuición no solo en la latencia del agendador de Airflow, sino en la disponibilidad general del sistema.
  • El número de tareas por fila que están esperando en la fila vs. corriendo.
  • El número de lugares ocupados por cada trabajador. Esta estadística nos muestra si el trabajador está a su máxima capacidad y necesita escalar.
  • El número de DAGs y el tiempo que le tarda a Airflow analizar cada DAG.

Personalización de Airflow en Lyft

Auditoría en el UI

El aislamiento de tenencia multiple del UI en Airflow 1.8.2 tiene limitaciones documentadas. Es difícil para nosotros contestar preguntas de usuarios como: “¿Quién pausó mi DAG?”, “¿Quién marcó esta tarea de mi DAG como éxito o falla?

Usamos el modelo actual de logs (registros) en Airflow y señales de Flask para implementar un log para auditar las acciones en el UI de Airflow. Este envía una señal de UI y hace que una función cree un registro con la información (quién lo hizo, qué acción) cuando algún punto de llamada del UI de Flask es usado.

Esta imagen muestra a alguien (que no se muestra) apagando una DAG llamada “fact_user_messages”. Esto nos ayuda a responder a estas preguntas fácilmente.

Links adicionales en el UI de cada tarea

Nos gustaría personalizar la interfaz de tareas con base en su operador de Airflow en Lyft. Por ejemplo, nos gustaría mostrar un link hacia Qubole (una plataforma de terceros para Hive) para el operador QuboleOperator, mostrar los logs de Kibana para todos los operadores, vincular hacia nuestro portal interno de datos para el operador HiveOperator, etc.

La imagen de arriba muestra cómo agregamos un link hacia Kibana en el panel de las tareas. Esta característica es genérica y permite a desarrolladores personalizar y asociar varios vínculos a los operadores. Hemos contribuido esta característica a upstream.

Grafo de dependencias de las DAGs

La pestaña de “Graph View” en el UI de Airflow es muy buena para visualizar dependencias dentro de una DAG. Sin embargo, algunas DAGs en Lyft tienen dependencias en otras DAGs, y el UI de Airflow no da una buena visualización de estas dependencias inter-DAG. Nuestro pasante, Tirath, creó una herramienta que visualiza el linaje/dependencia de una DAG y permite a un usuario filtrar la dependencia upstream/downstream para una DAG.

La herramienta busca las dependencias inter-DAG expresadas vía los ExternalTaskSensors para calcular el grafo de dependencias y aprovecha el plugin de webview de Airflow para visualización. En el futuro, mejoraremos la herramienta para enseñar el estado de la DAG en el grafo de dependencias en tiempo real, para ayudar a ingenieros de datos entender algún problema relacionado a su DAG.

Mejorando el rendimiento y fiabilidad de Airflow en Lyft

Reducir el tiempo de carga de la interfaz de usuario de Airflow

Un punto débil que tenemos con Airflow en Lyft es que toma mucho tiempo para cargar el UI para algunas DAGs. Tenemos cientos de DAGs corriendo en producción, algunas con cientos de tareas. El UI default carga la vista en árbol, tree view, de la DAG con las 25 DagRuns pasadas con toda la información de las tareas. Frecuentemente, esto supera el tiempo determinado del UI, y previene a los usuarios ver los detalles de la ejecución de su DAG.

Aunque podríamos reescribir el UI para tener mejor desempeño, descubrimos que simplemente reducir el número de DAG runs para visualizar de 25 a 5 ayuda a reducir significativamente el tiempo de carga de la página. Contribuimos este cambio a upstream que permite a los usuarios modificar default_dag_run_display_number del número de DagRuns para visualizar en el archivo de configuración de Airflow (Airflow.cfg).

  1. Paralelismo: Esta variable controla el número de instancias de tareas que el trabajador de Airflow puede correr simultáneamente. Los usuarios pueden incrementar la variable de paralelismo — parallelism — en Airflow.cfg . Normalmente sugerimos a usuarios incrementar este valor durante backfills.
  2. Concurrencia: El agendador de Airflow no correrá más tareas que la cantidad de tareas concurrentes asignadas a tu DAG en ningún momento. La concurrencia está definida en tu DAG de Airflow como un parámetro de entrada. Si no estableces la concurrencia de tu DAG, el agendador usará el valor predeterminado de dag_concurrency en tu Airflow.cfg.
  3. max_active_runs: Airflow no correrá mas DagRuns que la cantidad de de max_active_runs asignadas a tu DAG en cualquier momento. Si no defines el valor de max_active_runs_per_dag, Airflow usará el valor predeterminado de max_active_runs_per_dag de tu Airflow.cfg . Sugerimos a los usuarios no definir depends_on_past a true para incrementar esta configuración durante backfills.
  4. Pool: La Airflow pool es usada para limitar el paralelismo de ejecución. Los usuarios pueden incrementar el priority_weight de la tarea si es crítica.

Reducir la latencia de programación de Airflow

La latencia de programación de Airflow se puede medir por el retraso entre el tiempo en que las dependencias de una tarea se cumplen y cuando la tarea empieza. En Lyft, ajustamos las siguientes configuraciones para reducir la latencia:

  • max_threads: El agendador creará varios hilos en paralelo para programar las DAGs. Esto es controlado por max_threads con el valor predeterminado de 2. Los usuarios deben incrementar este valor a uno más grande (e.g. el número de CPUs donde corre el agendador — 1) en producción.
  • scheduler_heartbeat_sec: Los usuarios deben considerar incrementar la configuración de scheduler_heartbeat_sec a un valor mayor (e.g. 60 segundos) que controla qué tan frecuente el programador de Airflow obtiene el pulso y actualiza la entrada del trabajo en la metastore de Airflow.

Varios desarrolladores de la comunidad han contribuido algunos arreglos a Airflow upstream para reducir aún más la latencia.

Mejorar la fiabilidad de Airflow

Invertimos considerables esfuerzos para mejorar la fiabilidad de Airflow. Aquí hay algunas cosas que cabe la pena mencionar:

  • Source Control para Pools: Mantenemos la configuración de la Airflow Pool en source control y revisamos la solicitud de cada equipo con su estimado del máximo de espacios para tareas. La configuración actualizada se aplica para Airflow en runtime.
  • Pruebas de Integración para DAGs: Tenemos pruebas de integración en la fase de CI (Continuous Integration, Integración Continua), que hace verificaciones para asegurar las mejores prácticas para Airflow, incluyendo una prueba de validez en todas las definiciones de DAG; una validación del parámetro de start_date para garantizar que todas las DAGs tienen un start_date fijo, una prueba para pools para asegurar que no hay pool sin usar y una validación para asegurar que el pool de una DAG sí existe, etc.
  • Acceso al UI de manera segura: Deshabilitamos el acceso de escritura en algunos modelos de UI importantes (e.g, PoolModelView, VariableView, DagRunModelView) en Airflow. Esto es para evitar que los usuarios accidentalmente modifiquen las tablas de Pool, Variable, and DagRun en la metastore de Airflow desde el UI.

Conclusión

En este post compartimos cómo operamos Airflow en Lyft. Describimos:

  1. Cómo se ve la arquitectura general de Airflow.
  2. Cómo monitoreamos Airflow para mantener un SLA alto.
  3. Cómo personalizamos Airflow para los usos principales en Lyft.
  4. Cómo mejoramos el desempeño y fiabilidad de Airflow en producción.

En el futuro, nos enfocaremos en mejorar la seguridad de Airflow para el ambiente multi-tenencia. También planeamos tener un segundo post para compartir cómo construimos varias herramientas para Airflow y aumentar la productividad de desarrolladores de ETL en Lyft.

Reconocimientos

Muchas gracias a las y los miembros del equipo de Data Platform en Lyft, especialmente a Maxime Beauchemin, Tao Feng, Andrew Stahlman, Junda Yang, Max Payton, Chao-Han Tsai, Alagappan Sethuraman, y Jinhyuk Chang por sus contribuciones para mantener el SLA de Airflow en Lyft y a Shenghu Yang, Yuko Yamazaki, Prashant Kommireddi, y Guy Bayes por su orientación.

Gracias a Maxime Beauchemin y Mark Grover por la revisión.

--

--