¿Dónde están mis datos? — un singular encuentro con el conector Kinesis de Flink streaming

Luis Felipe Alvarez Sanchez
Lyft Engineering en Español
14 min readApr 29, 2024

Este artículo fue publicado originalmente el 14 de agosto de 2023 en eng.lyft.com y fue traducido por Luis Felipe Álvarez Sánchez

Por años, Lyft no ha sido solamente un proponente, sino también contribuyente a Apache Flink. Las tuberías de datos de Lyft han evolucionado drásticamente durante los años; sin embargo, una y otra vez, encontramos casos únicos que estiran Flink a sus puntos de ruptura — este es uno de esos ejemplos.

Contexto

Mientras que Lyft ejecuta múltiples aplicaciones de streaming, la que específicamente está en cuestión es la del trabajo de persistencia. Lo que éste hace es transmitir datos de Kinesis, ejecutar un cierto nivel de transformaciones y serializaciones, y escribir en S3 cada ciertos minutos.

La tubería de datos de Flink para persistir los datos de Kinesis a S3.

En este caso, persiste una gran mayoría de eventos generados en Lyft, los cuales ocurren a un ritmo de 80 gigabytes por minuto en promedio y se ejecutan con un paralelismo de 1800, lo que resulta ser uno de los trabajos de streaming más largos de Lyft.

Capítulo 1: El apagón

Empecemos por el final, ¿sale?

Ingenierx de datos: “¡Alerta! ¡Mis reportes no se están generando! ¡Los datos del upstream no están disponibles para ser generados! “

Ingenierx de plataforma: “¡Estoy en eso! ¡Parece ser que nuestra aplicación de streaming para persistir los datos está disponible y corriendo, pero tampoco veo que se están escribiendo datos!”

Como cualquier buen ingeniero haría, sacamos los libros de ejecución y con mucho cuidado ejecutamos los siguientes pasos detalladamente:

Ingenierx de plataforma: “Permítanme revertir el cambio aparentemente inofensivo que acabamos de subir en producción.”

Ingenierx de plataforma: “No hubo suerte.”

Ingenierx de plataforma: “Bueno, permítanme tratar de apagarlo y prenderlo otra vez.”

Ingenierx de plataforma: “No hubo suerte.”

Ingenierx de plataforma: “Bueno, permítanme tratar de realizar un reinicio completo, y luego después llenamos los datos faltantes.”

Ingenierx de plataforma: “¡Funcionó!”

Este patrón desafortunadamente se continuó repitiendo en una cadencia quincenal durante un mes. Claramente, se tenía que realizar una investigación más a fondo.

Capítulo 2: La investigación

En el resto del artículo, vamos a analizar la causa raíz del comportamiento mencionado anteriormente, y su relación con el funcionamiento interno del consumidor de Flink Kinesis.

Para comenzar, nos hicimos la siguientes preguntas:

  • ¿Por qué el trabajo tuvo un bajo rendimiento de la nada?
  • ¿Por qué el problema continuó después de haber intentado deshacer los cambios y de comenzar desde un punto de control anterior?

Nuestra primeras investigaciones arrojaron lo siguiente:

Edad del iterador por fragmento de Kinesis

La edad del iterador, o el tiempo entre el tiempo actual de procesamiento y el tiempo en el que el evento llegó al fragmento de Kinesis durante el apagón, se muestra a continuación.

La gráfica de antigüedad del iterador por fragmento indica un incremento de ~3 minutos para los fragmentos 1 a 9, seguidos por un espacio y una antigüedad del iterador de fragmentos de ~5 días para el fragmento 10, pero ninguna edad del iterador de fragmentos para los otros fragmentos. Las métricas anteriores fueron regeneradas con muchos menos fragmentos, ya que las métricas originales dejaron de mantenerse.

Aunque es difícil de ver, la gráfica anterior indica que para todos los fragmentos, excepto uno, la edad del iterador es regular, de ~3 minutos. Luego hay un espacio en las métricas seguida por la edad del iterador de un único fragmento indicando un retraso de ~5 días de edad de iterador.

Profundizando más, podemos ver que el único fragmento en cuestión había experimentado el siguiente comportamiento días antes:

La edad del iterador del fragmento 10 es estable a los ~3 minutos hasta las ~12:46 pm el 7/26 seguido de una brecha en las métricas durante varios días, seguido de un incremento en la métrica a un retraso de edad de iterador de ~5 días.

He aquí que aparentemente nuestro problema no empezó en el tiempo de detección cuando a los usuarios intermedios parecía que les faltaban datos, sino desde días antes.

Capítulo 3: Cinco días antes

En este punto, hemos cambiado nuestra investigación a cinco días antes, correspondiendo al tiempo en el que el retraso de la edad del iterador para el fragmento anterior estuvo ausente. Pareciera haber un puñado de métricas irregulares al mismo tiempo, incluyendo las siguientes:

Gráfico de CPU del administrador de tareas correspondiente a una serie de administradores de tareas que alcanzan un pico cercano o máximo del 100 % de uso de CPU durante ~10 minutos antes de volver a niveles normales de ~5 %. Las métricas anteriores fueron regeneradas con menos administradores de tareas, ya que las métricas originales se han dejado de retener.

Como se muestra en la gráfica anterior, durante este periodo de tiempo muchos de los 1800 administradores de tareas llegaron a un uso de CPU del 100% con una limitación significativa de CPU. Además, para los administradores de tareas que experimentaron prolongados periodos de limitación de CPU, visualizamos brechas en los registros de más de 10 minutos por consumidor de Kinesis:

Una subtarea que muestra una brecha significativa en el registro de un subproceso basado en temporizador.

Antes de comenzar con la siguiente parte de la historia, necesitaremos desarrollar un entendimiento profundo del consumidor Kinesis Flink.

Capítulo 4: El consumidor de Kinesis Flink

Para simplificar, vamos a asumir que existe un fragmento de Kinesis por cada sub-tarea de Flink. Una sub-tarea en este contexto se define como una porción paralela de una tarea o una unidad de ejecución que se puede programar. Ahora bien, desde un nivel alto, tenemos que asumir que el consumo de un singular fragmento ocurre de esta manera:

Consumidor de Flink Kinesis procesando un fragmento de kinesis.

De hecho, este puede haber sido el caso al principio, pero se podría y se deberían de realizar más optimizaciones al respecto.

¿Qué pasa si queremos leer o escribir de manera asíncrona?

El consumidor Flink Kinesis procesando un singular fragmento de Kinesis de manera asíncrona.

El enfoque multiproceso anterior soluciona de manera elegante este problema, sin embargo, surgen más problemas.

¿Qué pasa si queremos mantener la alineación del tiempo del evento en el origen?

Dos de los modelos anteriores son inherentemente defectuosos con respecto a un concepto conocido como alienación del tiempo de eventos, previniendo que cualquier sub-tarea procese datos con un tiempo de evento muy por delante de los tiempos de eventos de registro correspondientes de otras subtareas.

Como se discutió en esta propuesta de mejora en Flink, los eventos de tiempo no alineados del operador de origen puede conducir a un crecimiento incontrolable del estado.

Hay que definir una terminología que ayudará con los ejemplos a continuación:

  • Marca de agua local o local watermark: Corresponde a una subtarea que se alinea estrechamente con el tiempo del evento, indicando a los operadores intermedios que no deben llegar al operador elementos con una marca de tiempo anterior o igual a la marca de agua.
  • Marca de agua global o global watermark: Corresponde al mínimo de las marcas de agua locales de las subtareas.
  • “Anticipación” de marca de aguas o watermark “lookahead”: Corresponde a la cantidad de tiempo que una subtarea puede procesar los datos delante de la marca de agua global.

Ejemplo

A continuación vamos a ejemplificar la relación entre las diferentes marcas de agua relacionadas con las marcas de tiempo definidas anteriormente.

Al principio, el tiempo del evento de la subtarea 1 y, por lo tanto, la marca de agua, van por detrás la subtarea 2. La marca roja indica qué tan lejos puede estar la marca de agua de cualquier subtarea de la marca de agua global, o el mínimo de todas las marcas de agua locales.
Después de un poco de procesamiento, la marca de agua de la subtarea 2 se convierte en la nueva marca de agua global.
Por último, la subtarea 1 se adelanta lo suficiente a la marca de agua global como para impedirle procesar más datos para mantener las marcas de agua de las subtareas relativamente alineadas.

Ahora, vamos a ver cómo este concepto es implementado en el contexto de nuestro consumidor

El consumidor de Flink Kinesis cuenta, para cada alineación de tiempo de eventos.

RecordEmitter: La marca de agua de cada registro es comparada con la marca de agua global para decidir si emitir ya el registro descendiente o aún no.

PeriodicWatermarkEmitter: Este hilo periódicamente emite la marca de agua correspondiente al último registro emitido.

WatermarkSyncCallback: Este hilo periódicamente actualiza el administrador de trabajos con las subtareas de marca de agua locales y es regresado a la marca de agua global de todas las subtareas.

Aunque es elegante, este modelo aún tiene algunas fallas que necesitan ser resueltas.

¿Qué pasa si el fragmento no tiene más datos?

Bajo las condiciones actuales del modelo, si un fragmento se ha dejado de escribir por alguna razón, la subtareas locales de la marca de agua no van a progresar y por ende se bloquea la marca de agua global. Si esto ocurre, otras subtareas eventualmente no podrán procesar datos dada la alineación del tiempo del evento.

Para manejar el caso del fragmento inactivo, ¿qué es lo que hacemos? Esencialmente, ignoramos la sub-tarea.

Ejemplo

La marca de agua de la subtarea 1 empieza como la marca de agua global.
La subtarea 2 no tiene datos para consumir del fragmento y su marca de agua se convierte en la marca de agua global.
La subtarea 2 aún no ha consumido datos del fragmento por un periodo de tiempo, así que es ignorada.

Para manejar fragmentos inactivos podemos ejecutar estas tres acciones:

  1. Marcar la subtarea como inactiva para que los operadores posteriores ignoren sus marcas de agua, o la falta de ellas.
  2. Prevenir que la marca de agua local de las subtareas impacte la marca de agua global.
  3. Cuando la marca de agua global es calculada para otras subtareas, solamente considera las marcas de agua locales actualizadas en los últimos 60 segundos.
El consumidor Flink Kinesis en condiciones de fragmento inactivo.

A este punto, tenemos el conocimiento requerido para continuar con el siguiente capítulo. Vale la pena notar que mucha de la complejidad multi-hilo y de la alineación del tiempo del evento es la motivación detrás de refactorización del código fuente de Flink, que implementa estos conceptos de manera nativa en Flink.

Capítulo 5: Viaje en el tiempo

Existe una última pieza del rompecabezas a descubrir antes de explicar qué sucedió: ¡los registros! Los registros periódicamente muestran la marca de agua local, la marca de agua global y el estatus inactivo de cada sub tarea.

En nuestro caso, vemos los siguientes eventos para la subtarea y el fragmento en cuestión:

  1. La marca de agua local y global son emitidas normalmente.
  2. Durante la limitación de la CPU, no existen registros reportados por el consumidor por varios minutos.
  3. La marca de agua global se ha movido hacia atrás en el tiempo varios minutos.
  4. La subtarea es marcada como inactiva.
  5. La subtarea no emite ningún registro dado al mecanismo de alineación de tiempo del evento.
  6. La subtarea global no se actualiza ya que la fuente está inactiva.
  7. Los registros con la misma marca de agua local, marca de agua global, y el estatus inactivo son reportados indefinidamente.

A menudo se piensa que la marca de agua global solo puede avanzar en el tiempo, no retroceder, pero así sucedió. En el próximo capítulo juntaremos las piezas para explicar la serie de fenómenos extraños que hemos visto hasta ahora.

Capítulo 6: Interbloqueo o "the Deadlock"

A este punto poseemos una cantidad útil de observaciones curiosas.

Días previos al reinicio:

  • Durante un período de tiempo prolongado ocurre una limitación de CPU alta.
  • Después de una limitación de CPU alta, la marca de agua global retrocede en el tiempo.
  • Después de una limitación de CPU alta, una subtarea es marcada como inactiva, y no emite nuevos registros incluso cuando el fragmento contiene más datos.
  • La edad del iterador del fragmento no aparece en las métricas por ~5 días.

En el momento del reinicio, cinco días después:

  • Solo hay un fragmento cuya edad de iterador aparece en las métricas.
  • El rendimiento de la aplicación es de <1% del esperado, independientemente de si comienza con un punto de control o un punto de guardado anterior.

Esto nos lleva a responder la primera de las muchas preguntas que explican el comportamiento.

¿Por qué la subtarea no emite datos cuando el fragmento contiene datos?

La subtarea no emite datos ya que entra al estado de interbloqueo:

  1. La marca de agua local de la subtarea A está demasiado por delante de la marca de agua global, correspondiente a la marca de agua de la subtarea B, para emitir un nuevo registro según el mecanismo de alineación de tiempo del evento.
  2. La subtarea A es marcada como inactiva, por ende no recibe la marca de agua global actualizada del administrador de trabajos.

La siguiente pregunta qué es obvia y que necesita una respuesta es cómo se cumplen cada una de estas condiciones.

#1: ¿Cómo puede una subtarea adelantarse más allá de una marca de agua global?

Mientras que la marca de agua de una subtarea puede, bajo circunstancias normales, adelantarse demasiado (o en nuestro caso, 10 minutos por delante) de la marca de agua más lenta debido a la desviación, aquí no es así. Como resultado del procesamiento normal, la marca de agua local de la subtarea está unos minutos por delante de la marca de agua global, algo que se espera con 1800 subtareas.

En este caso, la subtarea se adelanta demasiado a la marca de de agua global como resultado de que la marca de agua global retroceda en el tiempo varios minutos.

¿Qué provoca que el marcador de agua global retroceda en el tiempo?

La marca de agua global retrocede en el tiempo como resultado de una disminución prolongada de la limitación de la CPU entre una serie de subtareas. Como sugieren tanto los registros como las métricas de reducción de la limitación de la CPU, muchas subtareas alcanzan este escenario de detención total donde las operaciones ya no están ocurriendo. En este próximo ejemplo, mostraremos cómo esto lleva a una marca de agua global que parece revertirse con el tiempo.

Ejemplo

Para empezar, la marca de agua de la subtarea 1 es la marca de agua global.
La subtarea 2 se acelera por la CPU y detiene todas las operaciones. Su marca de agua se convierte en la marca de agua global.
La subtarea 2, que aún está limitada por la CPU, no actualiza su marca de agua con el administrador de trabajos durante más de 60 segundos. Por lo tanto, su marca de agua se ignora en el cálculo de la marca de agua global y la marca de agua local de la subtarea 1 se convierte en global.
La subtarea 2, que reanuda el consumo de fragmentos después de la aceleración de la CPU, actualiza su marca de agua local con el administrador de trabajos. Por lo tanto, la marca de agua global ha retrocedido como resultado de la limitación de la CPU.

Por lo tanto, al retroceder la marca de agua global, a diferencia de que la marca de agua local avance de forma natural, es muy posible que una subtarea alcance instantáneamente un punto en el que ya no pueda emitir registros, cumpliendo la condición #1 del estado de interbloqueo.

#2 ¿Por qué la subtarea está marcada como inactiva?

Una subtarea del consumidor de Kinesis se marca como inactiva cuando se cumplen dos condiciones:

  • La cola de emisión está vacía.
  • No se han escrito registros en la cola de emisión en los últimos 60 segundos.

La reducción prolongada de la limitación de la CPU es un caso único donde estas condiciones tienen la posibilidad de mantenerse ciertas mientras los registros siguen siendo escritos en el fragmento por un productor. Las condiciones se mantienen de la siguiente manera:

  1. Según los registros, la cola está vacía al despertar de la reducción de la velocidad de la CPU y, por lo tanto, también al ingresar en la reducción de la velocidad de la CPU; esto sucede por casualidad pero es más probable que ocurra con un tamaño de cola más pequeño y tiempos de encuesta de Kinesis más altos, como los que hemos configurado.
  2. Debido a la reducción de la velocidad de la CPU, no se han escrito registros en la cola de emisión durante varios minutos.

Es a través de esta lógica que comprendemos por qué una subtarea se marca como inactiva; sin embargo, todavía queda una pregunta importante sin respuesta.

¿Por qué permanece inactiva la subtarea?

Poco después de que la reducción de la velocidad de la CPU se detenga, esperaríamos que la cola de emisión se llene y nos libere de nuestro estado inactivo. Esto permitiría que el marcador de agua global se pueda actualizar para la subtarea y nuestra subtarea reanudaría la emisión de datos como de costumbre, sin verse obstaculizada por el mecanismo de alineación del tiempo de evento.

Sin embargo, es importante tener en cuenta que anteriormente se describen las condiciones para que una subtarea se marque como inactiva. No se describen las condiciones para que una subtarea se marque como no inactiva. Para ser marcada como no inactiva, es necesario que la marca de agua emitida haya progresado, lo cual, como se mencionó anteriormente, no es el caso debido al estado de bloqueo.

Capítulo 7: El lanzamiento

Para este punto, tenemos una única subtarea en un estado de bloqueo por 5 días. No hay datos siendo consumidos por su respectivo fragmento. Todos los demás comportamientos aparentan estar normales, incluyendo el consumo de otros fragmentos de las subtareas correspondientes.

¿Qué ocurre durante un lanzamiento (deployment, despliegue)?

A la hora de un lanzamiento, el operador Flink K8s de Lyft realiza lo siguiente:

  1. Toma un punto de control
  2. Detiene el trabajo
  3. Crea un nuevo cluster
  4. Envía el job al cluster nuevo con el punto de control

De la perspectiva del consumidor de Flink Kinesis, lo siguiente ocurre al enviar el nuevo trabajo:

  1. Los fragmentos son asignados a subtareas
  2. Las compensaciones de fragmentos anteriores se distribuyen desde el punto de control
  3. Las subtareas empiezan a consumir de los fragmentos
  4. Las subtareas recalculan la marca de agua global
  5. El procesamiento de datos continúa de manera

Ejemplo

Antes del lanzamiento, la subtarea 2 estaba en un estado de punto muerto y la subtarea 1 avanzó durante varios días con la marca de agua local y global.
Después del lanzamiento, las marcas de agua locales no fueron cambiadas pero la marca de agua global fue re-calculada.
A este punto, la marca de agua local de la subtarea 2, correspondiente a la marca de agua global, comenzó a procesar los datos de los últimos 5 días, mientras que la subtarea 1 estaba restringida por el mecanismo de alineación del tiempo del evento.

Por eso, solamente vemos métricas para la edad del iterador de un fragmento, ya que esas métricas solamente son producidas cuando los registros son obtenidos del fragmento correspondiente de Kinesis.

Capítulo 8: El reinicio completo

Este problema fue temporalmente aliviado por el “reinicio completo”, o empezar la tarea sin un punto de control. Como no hay ningún estado asociado tras la inicialización, cada subtarea empieza a leer del último registro de cada fragmento. De este modo, el interbloqueo es aliviado y el mecanismo de alineación del tiempo del evento no previene a ninguna subtarea de procesar su fragmento correspondiente.

Para mitigar la pérdida de datos, empezamos una tarea por separado donde cada subtarea empieza leyendo su fragmento correspondiente desde un momento específico en el tiempo. En nuestro caso, esto fue inicializado hasta el momento anterior al lanzamiento original.

Capítulo 9: Las consecuencias

Originalmente asumimos que habíamos mitigado toda la pérdida de datos, sin embargo, este no era el caso. Solo hubo un fragmento que no pudimos salvar. Recordemos que una única subtarea estuvo en el estado de interbloqueo por 5 días. El rellenado (backfill) para mitigar el incidente previno la pérdida de datos desde el tiempo del deployment. Como resultado, se produjo una pérdida de datos para 1 de cada 1800 fragmentos, correspondiente a 1/1800 de datos, durante un periodo de 5 días.

Como el stream de Kinesis corresponde a 7 días de un periodo de retención, 2 días después del “reinicio completo”, desafortunadamente nuestros datos no procesados en el fragmento dejaron de ser retenidos mucho antes de concluir con la causa raíz antes mencionada.

Capítulo 10: Medidas preventivas

Para concluir nuestra investigación, nos preguntamos cómo podríamos detectar un caso así antes y evitar esta posibilidad en el futuro. Adoptamos un enfoque múltiple y realizamos lo siguiente:

  • Solución de la causa raíz [FLINK-29099]: : Aliviar el bloqueo recuperando la marca de agua global actualizada cuando la subtarea está inactiva.
  • Monitoreo adicional: A nivel de fragmento, monitorear que los fragmentos que se producen también se consuman desde la aplicación de Flink correspondiente.
  • Mitigar la limitación de CPU: Dado que esta aplicación ha tenido ocasionalmente niveles altos de uso de CPU y de limitación de CPU, hemos descompuesto la aplicación en aplicaciones más pequeñas y se realizó un análisis adicional sobre la causa raíz del alto uso de CPU.

Notas de cierre

Aquí en Lyft sabemos que este problema que experimentamos no será el último. En efecto, nuevos y emocionantes retos surgen diariamente. Como tal, planeamos seguir continuando compartiendo estas experiencias con la comunidad de ingeniería.

Adicionalmente, estamos activamente buscando ingenieros innovadores para ayudarnos a llevar nuestras plataformas de streaming al siguiente nivel. Me encantaría conectarme con ustedes para compartirles más información de lo que hacemos y discutir oportunidades que los entusiasmen.

Publicaciones relevantes (en inglés)

  • Obtén información sobre cómo superamos la asimetría de datos en las aplicaciones de transmisión en Lyft.
  • Obtén más información sobre varios casos de uso de streaming en Lyft en esta publicación de blog.
  • Consulta esta publicación de blog sobre la evolución de los canales de transmisión en el equipo de Marketplace de Lyft.

--

--