¿Dónde están mis datos? — un singular encuentro con el conector Kinesis de Flink streaming
Este artículo fue publicado originalmente el 14 de agosto de 2023 en eng.lyft.com y fue traducido por Luis Felipe Álvarez Sánchez
Por años, Lyft no ha sido solamente un proponente, sino también contribuyente a Apache Flink. Las tuberías de datos de Lyft han evolucionado drásticamente durante los años; sin embargo, una y otra vez, encontramos casos únicos que estiran Flink a sus puntos de ruptura — este es uno de esos ejemplos.
Contexto
Mientras que Lyft ejecuta múltiples aplicaciones de streaming, la que específicamente está en cuestión es la del trabajo de persistencia. Lo que éste hace es transmitir datos de Kinesis, ejecutar un cierto nivel de transformaciones y serializaciones, y escribir en S3 cada ciertos minutos.
En este caso, persiste una gran mayoría de eventos generados en Lyft, los cuales ocurren a un ritmo de 80 gigabytes por minuto en promedio y se ejecutan con un paralelismo de 1800, lo que resulta ser uno de los trabajos de streaming más largos de Lyft.
Capítulo 1: El apagón
Empecemos por el final, ¿sale?
Ingenierx de datos: “¡Alerta! ¡Mis reportes no se están generando! ¡Los datos del upstream no están disponibles para ser generados! “
Ingenierx de plataforma: “¡Estoy en eso! ¡Parece ser que nuestra aplicación de streaming para persistir los datos está disponible y corriendo, pero tampoco veo que se están escribiendo datos!”
Como cualquier buen ingeniero haría, sacamos los libros de ejecución y con mucho cuidado ejecutamos los siguientes pasos detalladamente:
Ingenierx de plataforma: “Permítanme revertir el cambio aparentemente inofensivo que acabamos de subir en producción.”
Ingenierx de plataforma: “No hubo suerte.”
Ingenierx de plataforma: “Bueno, permítanme tratar de apagarlo y prenderlo otra vez.”
Ingenierx de plataforma: “No hubo suerte.”
Ingenierx de plataforma: “Bueno, permítanme tratar de realizar un reinicio completo, y luego después llenamos los datos faltantes.”
Ingenierx de plataforma: “¡Funcionó!”
Este patrón desafortunadamente se continuó repitiendo en una cadencia quincenal durante un mes. Claramente, se tenía que realizar una investigación más a fondo.
Capítulo 2: La investigación
En el resto del artículo, vamos a analizar la causa raíz del comportamiento mencionado anteriormente, y su relación con el funcionamiento interno del consumidor de Flink Kinesis.
Para comenzar, nos hicimos la siguientes preguntas:
- ¿Por qué el trabajo tuvo un bajo rendimiento de la nada?
- ¿Por qué el problema continuó después de haber intentado deshacer los cambios y de comenzar desde un punto de control anterior?
Nuestra primeras investigaciones arrojaron lo siguiente:
Edad del iterador por fragmento de Kinesis
La edad del iterador, o el tiempo entre el tiempo actual de procesamiento y el tiempo en el que el evento llegó al fragmento de Kinesis durante el apagón, se muestra a continuación.
Aunque es difícil de ver, la gráfica anterior indica que para todos los fragmentos, excepto uno, la edad del iterador es regular, de ~3 minutos. Luego hay un espacio en las métricas seguida por la edad del iterador de un único fragmento indicando un retraso de ~5 días de edad de iterador.
Profundizando más, podemos ver que el único fragmento en cuestión había experimentado el siguiente comportamiento días antes:
He aquí que aparentemente nuestro problema no empezó en el tiempo de detección cuando a los usuarios intermedios parecía que les faltaban datos, sino desde días antes.
Capítulo 3: Cinco días antes
En este punto, hemos cambiado nuestra investigación a cinco días antes, correspondiendo al tiempo en el que el retraso de la edad del iterador para el fragmento anterior estuvo ausente. Pareciera haber un puñado de métricas irregulares al mismo tiempo, incluyendo las siguientes:
Como se muestra en la gráfica anterior, durante este periodo de tiempo muchos de los 1800 administradores de tareas llegaron a un uso de CPU del 100% con una limitación significativa de CPU. Además, para los administradores de tareas que experimentaron prolongados periodos de limitación de CPU, visualizamos brechas en los registros de más de 10 minutos por consumidor de Kinesis:
Antes de comenzar con la siguiente parte de la historia, necesitaremos desarrollar un entendimiento profundo del consumidor Kinesis Flink.
Capítulo 4: El consumidor de Kinesis Flink
Para simplificar, vamos a asumir que existe un fragmento de Kinesis por cada sub-tarea de Flink. Una sub-tarea en este contexto se define como una porción paralela de una tarea o una unidad de ejecución que se puede programar. Ahora bien, desde un nivel alto, tenemos que asumir que el consumo de un singular fragmento ocurre de esta manera:
De hecho, este puede haber sido el caso al principio, pero se podría y se deberían de realizar más optimizaciones al respecto.
¿Qué pasa si queremos leer o escribir de manera asíncrona?
El enfoque multiproceso anterior soluciona de manera elegante este problema, sin embargo, surgen más problemas.
¿Qué pasa si queremos mantener la alineación del tiempo del evento en el origen?
Dos de los modelos anteriores son inherentemente defectuosos con respecto a un concepto conocido como alienación del tiempo de eventos, previniendo que cualquier sub-tarea procese datos con un tiempo de evento muy por delante de los tiempos de eventos de registro correspondientes de otras subtareas.
Como se discutió en esta propuesta de mejora en Flink, los eventos de tiempo no alineados del operador de origen puede conducir a un crecimiento incontrolable del estado.
Hay que definir una terminología que ayudará con los ejemplos a continuación:
- Marca de agua local o local watermark: Corresponde a una subtarea que se alinea estrechamente con el tiempo del evento, indicando a los operadores intermedios que no deben llegar al operador elementos con una marca de tiempo anterior o igual a la marca de agua.
- Marca de agua global o global watermark: Corresponde al mínimo de las marcas de agua locales de las subtareas.
- “Anticipación” de marca de aguas o watermark “lookahead”: Corresponde a la cantidad de tiempo que una subtarea puede procesar los datos delante de la marca de agua global.
Ejemplo
A continuación vamos a ejemplificar la relación entre las diferentes marcas de agua relacionadas con las marcas de tiempo definidas anteriormente.
Ahora, vamos a ver cómo este concepto es implementado en el contexto de nuestro consumidor
RecordEmitter: La marca de agua de cada registro es comparada con la marca de agua global para decidir si emitir ya el registro descendiente o aún no.
PeriodicWatermarkEmitter: Este hilo periódicamente emite la marca de agua correspondiente al último registro emitido.
WatermarkSyncCallback: Este hilo periódicamente actualiza el administrador de trabajos con las subtareas de marca de agua locales y es regresado a la marca de agua global de todas las subtareas.
Aunque es elegante, este modelo aún tiene algunas fallas que necesitan ser resueltas.
¿Qué pasa si el fragmento no tiene más datos?
Bajo las condiciones actuales del modelo, si un fragmento se ha dejado de escribir por alguna razón, la subtareas locales de la marca de agua no van a progresar y por ende se bloquea la marca de agua global. Si esto ocurre, otras subtareas eventualmente no podrán procesar datos dada la alineación del tiempo del evento.
Para manejar el caso del fragmento inactivo, ¿qué es lo que hacemos? Esencialmente, ignoramos la sub-tarea.
Ejemplo
Para manejar fragmentos inactivos podemos ejecutar estas tres acciones:
- Marcar la subtarea como inactiva para que los operadores posteriores ignoren sus marcas de agua, o la falta de ellas.
- Prevenir que la marca de agua local de las subtareas impacte la marca de agua global.
- Cuando la marca de agua global es calculada para otras subtareas, solamente considera las marcas de agua locales actualizadas en los últimos 60 segundos.
A este punto, tenemos el conocimiento requerido para continuar con el siguiente capítulo. Vale la pena notar que mucha de la complejidad multi-hilo y de la alineación del tiempo del evento es la motivación detrás de refactorización del código fuente de Flink, que implementa estos conceptos de manera nativa en Flink.
Capítulo 5: Viaje en el tiempo
Existe una última pieza del rompecabezas a descubrir antes de explicar qué sucedió: ¡los registros! Los registros periódicamente muestran la marca de agua local, la marca de agua global y el estatus inactivo de cada sub tarea.
En nuestro caso, vemos los siguientes eventos para la subtarea y el fragmento en cuestión:
- La marca de agua local y global son emitidas normalmente.
- Durante la limitación de la CPU, no existen registros reportados por el consumidor por varios minutos.
- La marca de agua global se ha movido hacia atrás en el tiempo varios minutos.
- La subtarea es marcada como inactiva.
- La subtarea no emite ningún registro dado al mecanismo de alineación de tiempo del evento.
- La subtarea global no se actualiza ya que la fuente está inactiva.
- Los registros con la misma marca de agua local, marca de agua global, y el estatus inactivo son reportados indefinidamente.
A menudo se piensa que la marca de agua global solo puede avanzar en el tiempo, no retroceder, pero así sucedió. En el próximo capítulo juntaremos las piezas para explicar la serie de fenómenos extraños que hemos visto hasta ahora.
Capítulo 6: Interbloqueo o "the Deadlock"
A este punto poseemos una cantidad útil de observaciones curiosas.
Días previos al reinicio:
- Durante un período de tiempo prolongado ocurre una limitación de CPU alta.
- Después de una limitación de CPU alta, la marca de agua global retrocede en el tiempo.
- Después de una limitación de CPU alta, una subtarea es marcada como inactiva, y no emite nuevos registros incluso cuando el fragmento contiene más datos.
- La edad del iterador del fragmento no aparece en las métricas por ~5 días.
En el momento del reinicio, cinco días después:
- Solo hay un fragmento cuya edad de iterador aparece en las métricas.
- El rendimiento de la aplicación es de <1% del esperado, independientemente de si comienza con un punto de control o un punto de guardado anterior.
Esto nos lleva a responder la primera de las muchas preguntas que explican el comportamiento.
¿Por qué la subtarea no emite datos cuando el fragmento contiene datos?
La subtarea no emite datos ya que entra al estado de interbloqueo:
- La marca de agua local de la subtarea A está demasiado por delante de la marca de agua global, correspondiente a la marca de agua de la subtarea B, para emitir un nuevo registro según el mecanismo de alineación de tiempo del evento.
- La subtarea A es marcada como inactiva, por ende no recibe la marca de agua global actualizada del administrador de trabajos.
La siguiente pregunta qué es obvia y que necesita una respuesta es cómo se cumplen cada una de estas condiciones.
#1: ¿Cómo puede una subtarea adelantarse más allá de una marca de agua global?
Mientras que la marca de agua de una subtarea puede, bajo circunstancias normales, adelantarse demasiado (o en nuestro caso, 10 minutos por delante) de la marca de agua más lenta debido a la desviación, aquí no es así. Como resultado del procesamiento normal, la marca de agua local de la subtarea está unos minutos por delante de la marca de agua global, algo que se espera con 1800 subtareas.
En este caso, la subtarea se adelanta demasiado a la marca de de agua global como resultado de que la marca de agua global retroceda en el tiempo varios minutos.
¿Qué provoca que el marcador de agua global retroceda en el tiempo?
La marca de agua global retrocede en el tiempo como resultado de una disminución prolongada de la limitación de la CPU entre una serie de subtareas. Como sugieren tanto los registros como las métricas de reducción de la limitación de la CPU, muchas subtareas alcanzan este escenario de detención total donde las operaciones ya no están ocurriendo. En este próximo ejemplo, mostraremos cómo esto lleva a una marca de agua global que parece revertirse con el tiempo.
Ejemplo
Por lo tanto, al retroceder la marca de agua global, a diferencia de que la marca de agua local avance de forma natural, es muy posible que una subtarea alcance instantáneamente un punto en el que ya no pueda emitir registros, cumpliendo la condición #1 del estado de interbloqueo.
#2 ¿Por qué la subtarea está marcada como inactiva?
Una subtarea del consumidor de Kinesis se marca como inactiva cuando se cumplen dos condiciones:
- La cola de emisión está vacía.
- No se han escrito registros en la cola de emisión en los últimos 60 segundos.
La reducción prolongada de la limitación de la CPU es un caso único donde estas condiciones tienen la posibilidad de mantenerse ciertas mientras los registros siguen siendo escritos en el fragmento por un productor. Las condiciones se mantienen de la siguiente manera:
- Según los registros, la cola está vacía al despertar de la reducción de la velocidad de la CPU y, por lo tanto, también al ingresar en la reducción de la velocidad de la CPU; esto sucede por casualidad pero es más probable que ocurra con un tamaño de cola más pequeño y tiempos de encuesta de Kinesis más altos, como los que hemos configurado.
- Debido a la reducción de la velocidad de la CPU, no se han escrito registros en la cola de emisión durante varios minutos.
Es a través de esta lógica que comprendemos por qué una subtarea se marca como inactiva; sin embargo, todavía queda una pregunta importante sin respuesta.
¿Por qué permanece inactiva la subtarea?
Poco después de que la reducción de la velocidad de la CPU se detenga, esperaríamos que la cola de emisión se llene y nos libere de nuestro estado inactivo. Esto permitiría que el marcador de agua global se pueda actualizar para la subtarea y nuestra subtarea reanudaría la emisión de datos como de costumbre, sin verse obstaculizada por el mecanismo de alineación del tiempo de evento.
Sin embargo, es importante tener en cuenta que anteriormente se describen las condiciones para que una subtarea se marque como inactiva. No se describen las condiciones para que una subtarea se marque como no inactiva. Para ser marcada como no inactiva, es necesario que la marca de agua emitida haya progresado, lo cual, como se mencionó anteriormente, no es el caso debido al estado de bloqueo.
Capítulo 7: El lanzamiento
Para este punto, tenemos una única subtarea en un estado de bloqueo por 5 días. No hay datos siendo consumidos por su respectivo fragmento. Todos los demás comportamientos aparentan estar normales, incluyendo el consumo de otros fragmentos de las subtareas correspondientes.
¿Qué ocurre durante un lanzamiento (deployment, despliegue)?
A la hora de un lanzamiento, el operador Flink K8s de Lyft realiza lo siguiente:
- Toma un punto de control
- Detiene el trabajo
- Crea un nuevo cluster
- Envía el job al cluster nuevo con el punto de control
De la perspectiva del consumidor de Flink Kinesis, lo siguiente ocurre al enviar el nuevo trabajo:
- Los fragmentos son asignados a subtareas
- Las compensaciones de fragmentos anteriores se distribuyen desde el punto de control
- Las subtareas empiezan a consumir de los fragmentos
- Las subtareas recalculan la marca de agua global
- El procesamiento de datos continúa de manera
Ejemplo
Por eso, solamente vemos métricas para la edad del iterador de un fragmento, ya que esas métricas solamente son producidas cuando los registros son obtenidos del fragmento correspondiente de Kinesis.
Capítulo 8: El reinicio completo
Este problema fue temporalmente aliviado por el “reinicio completo”, o empezar la tarea sin un punto de control. Como no hay ningún estado asociado tras la inicialización, cada subtarea empieza a leer del último registro de cada fragmento. De este modo, el interbloqueo es aliviado y el mecanismo de alineación del tiempo del evento no previene a ninguna subtarea de procesar su fragmento correspondiente.
Para mitigar la pérdida de datos, empezamos una tarea por separado donde cada subtarea empieza leyendo su fragmento correspondiente desde un momento específico en el tiempo. En nuestro caso, esto fue inicializado hasta el momento anterior al lanzamiento original.
Capítulo 9: Las consecuencias
Originalmente asumimos que habíamos mitigado toda la pérdida de datos, sin embargo, este no era el caso. Solo hubo un fragmento que no pudimos salvar. Recordemos que una única subtarea estuvo en el estado de interbloqueo por 5 días. El rellenado (backfill) para mitigar el incidente previno la pérdida de datos desde el tiempo del deployment. Como resultado, se produjo una pérdida de datos para 1 de cada 1800 fragmentos, correspondiente a 1/1800 de datos, durante un periodo de 5 días.
Como el stream de Kinesis corresponde a 7 días de un periodo de retención, 2 días después del “reinicio completo”, desafortunadamente nuestros datos no procesados en el fragmento dejaron de ser retenidos mucho antes de concluir con la causa raíz antes mencionada.
Capítulo 10: Medidas preventivas
Para concluir nuestra investigación, nos preguntamos cómo podríamos detectar un caso así antes y evitar esta posibilidad en el futuro. Adoptamos un enfoque múltiple y realizamos lo siguiente:
- Solución de la causa raíz [FLINK-29099]: : Aliviar el bloqueo recuperando la marca de agua global actualizada cuando la subtarea está inactiva.
- Monitoreo adicional: A nivel de fragmento, monitorear que los fragmentos que se producen también se consuman desde la aplicación de Flink correspondiente.
- Mitigar la limitación de CPU: Dado que esta aplicación ha tenido ocasionalmente niveles altos de uso de CPU y de limitación de CPU, hemos descompuesto la aplicación en aplicaciones más pequeñas y se realizó un análisis adicional sobre la causa raíz del alto uso de CPU.
Notas de cierre
Aquí en Lyft sabemos que este problema que experimentamos no será el último. En efecto, nuevos y emocionantes retos surgen diariamente. Como tal, planeamos seguir continuando compartiendo estas experiencias con la comunidad de ingeniería.
Adicionalmente, estamos activamente buscando ingenieros innovadores para ayudarnos a llevar nuestras plataformas de streaming al siguiente nivel. Me encantaría conectarme con ustedes para compartirles más información de lo que hacemos y discutir oportunidades que los entusiasmen.
Publicaciones relevantes (en inglés)
- Obtén información sobre cómo superamos la asimetría de datos en las aplicaciones de transmisión en Lyft.
- Obtén más información sobre varios casos de uso de streaming en Lyft en esta publicación de blog.
- Consulta esta publicación de blog sobre la evolución de los canales de transmisión en el equipo de Marketplace de Lyft.