Streaming Estructurado para procesamiento en Synapse

Richard Luis Diaz
Analytics from the Trenchs
5 min readDec 3, 2022

Antes de comenzar! Esta es mi primera publicación, así que tratarla con cariño :) Si queréis saber un poco de mí revisar mi perfil, también podéis encontrarme en twitter como @richardldiaz o en linkedin como richardluisd.

Azure Synapse Analytics es un área de trabajo con multitud de posibilidades para el procesamiento de la información. Entre todos los servicios disponibles podemos trabajar con Grupos de Apache Spark.

El streaming estructurado es un motor de procesamiento de flujo escalable y tolerante a errores basado en el motor Spark SQL. Puede expresar su cálculo de transmisión de la misma manera que expresaría un cálculo por lotes en datos estáticos.

El motor Spark SQL se encargará de ejecutarlo de forma incremental y continua y de actualizar el resultado final a medida que sigan llegando datos de transmisión. Puedes encontrar más información aquí.

El objetivo de esta publicación es explicar como este motor de procesamiento nos puede facilitar el desarrollo de los procesos de transformación en nuestros proyectos de datos.

Spark.ReadStream y Spark.WriteStream

Desde Spark 2.0 los DataFrames pueden representar datos en streaming. Para crear un DataFarme en streaming los crearemos a través de spark.readstream:

Principalmente debemos tener en cuenta el formato del fichero que leemos y la localización de la carpeta dónde están llegando estos ficheros. Una vez los datos están en el df_stream ya podemos hacer operaciones relacionadas con el streaming: selección, proyección, agregación, joins y operaciones de ventana de tiempo. En esta publicación no hablaremos de estas operaciones, ya que no es objetivo de la publicación. Hay que tener en cuenta que nos podemos apoyar en ellas para posibles transformaciones necesarias por requisitos de negocio.

El siguiente paso del procesamiento de datos llevará a escribir estos datos en una siguiente capa del lago de datos a través de spark.writestream, teniendo la posibilidad de utilizar diversos formatos:

En este caso se propone el formato delta como formato de destino. Cualquier fichero que llegue a la carpeta de origen del df_stream será procesado por el streaming estructurado haciendo un append sobre la ruta delta de destino.

Con las siguientes propiedades:

  • df_stream.isStreaming nos permite conocer si el datraframe es de tipo streaming.
  • df_process.status nos da información sobre el proceso de streaming.
  • df_process.stop nos permite parar el procesamiento en streaming.

¿Qué nos aporta a nivel de procesamiento?

Una de las ventajas que encontramos es la gestión de ficheros, sin diseñar un proceso de gestión de ficheros por parte del desarrollador el streaming estructurado se encarga de esta gestión por nosotros. Sólo debemos definir el checkPointLocation dónde se alojará la información sobre los ficheros procesados.

En caso de fallo en el procesamiento o cierre intencional puedes recuperar el proceso anterior y el estado de la consulta anterior, y continuar donde lo dejó. Nuevamente se utilizan los puntos de control para ello. En próximas publicaciones profundizaremos con ejemplos.

Permite escribir en destino en múltiples formatos y con diferentes modos como append, overwrite, update o incluso con funciones personalizadas para cada batch. En próximas publicaciones profundizaremos con ejemplos.

La escalabilidad puede ser un punto importante, y synapse con los grupos de spark nos permite este escalado sin mayor dificultad.

Además, hay que tener en cuenta que el escenario de procesamiento de información, en muchos casos será por batch y, por tanto, no será necesario mantener los grupos de spark activos continuamente. Opciones como trigger=once permitirá ejecutar el proceso, cargar los ficheros pendientes y detenerse una vez no queden más ficheros para procesar.

Procesando…

Veamos un ejemplo de uso:

Para nuestro ejemplo utilizamos diferentes ficheros que llegan a nuestra capa crudas de datos. Estos datos serán incrementales, en cada fichero que llega tiene los cambios con respecto a la ingesta anterior. Con procesamiento estructurado actualizaremos la capa estaándar escribiendo en formato delta y haciendo un merge por las claves de las entidades. De esta manera nuestra capa curada está actualizada con los cambios que van llegando a la capa cruda. Cada fichero es procesado por su fecha de modificación, y en orden.

El siguiente código se encarga del procesamiento estructurado de los ficheros alojados en path_source_completed y escribirlos en el destino path_destination_completed:

Describimos algunas opciones para tener en cuenta como:

  • maxFilesPerTrigger: nos permite definir el número de ficheros que vamos a procesar en cada batch. Que es importante conocer en caso de usar el foreachBatch. En nuestro escenario planteado queremos procesar fichero a fichero porque de esta manera haremos el merge con el destino delta, asegurando que los datos de actualzian correctamente.
  • maxFileAge: nos permite definir que procese ficheros más antiguos a los ya procesados. Es de utilidad si queremos procesar datos a partir de una fecha concreta.
  • trigger(one=True): Permite lanzar el proceso y que de manera automática termine cuando se procesen los ficheros disponibles en origen. Esta opción tiene un bug relacionado con el uso de la variable maxFilePerTrigger. Puede encontrar más información aquí: https://issues.apache.org/jira/browse/SPARK-36533 y https://stackoverflow.com/questions/68345884/how-can-i-control-the-amount-of-files-being-processed-for-each-trigger-in-spark.
  • foreachBatch: permite definir una función y ejecutarla para cada batch. Un ejemplo podría ser el siguiente, aplicado a nuestro escenario donde escribimos en destino delta y hacemos un merge:

Aquí podéis encontrar un notebook con un ejemplo completo, donde también podéis encontrar una solución al bug del Trigger One para versiones de Spark anteriores a 3.3.

Aprovechando el streaming estructurado podemos tener de una forma rápida un proceso de transformación ágil y fácil de mantener.

En las siguientes publicaciones hablaremos de:

  • Procesamiento basado en metadatos
  • Optimizando el uso del clúster de Spark - Jobs vs Notebooks
  • Optimizando el uso del clúster de Spark - Personalización de sesiones
  • Optimizando el tamaño de los ficheros
  • Recuperación ante fallos.

Más detalle:

--

--

Richard Luis Diaz
Analytics from the Trenchs

Apasionado del mundo de los datos y de la tecnología que los rodea / Data & AI Technical Lead en Verne Tech / Apasionado de mi familia / Aficionado al Trail