Hello Dataflow!

Sense
Talks Grupodot
Published in
9 min readJan 5, 2017

--

El concepto de MapReduce ha sido utilizado muchas veces al interior de Google como respuesta a las necesidades de procesamiento en paralelo de grandes volúmenes de información, Jeffrey Dean y Sanjay Ghemawat exponen este modelo en el paper publicado por Google en el año 2004, el cual fue relevante para desarrollos posteriores como Hadoop. Aunque el modelo MapReduce no se puede aplicar a cualquier problema de procesamiento de datos, el éxito del modelo se basa en que puede manejar un volumen de información de tamaño significativamente grande (alcanzando los terabytes o petabytes) y reducirla mediante una transformación, una colección de datos relevantes, esto lo hace a través de dos funciones llamadas Map() y Reduce(). Para lograr su propósito el modelo hace uso de los sistemas de distribuidos y la computación paralela.

Map():

Esta función recibe una colección de duplas (clave valor), las procesa y genera un set de pares intermedios.
map (in_key, in_value) -> list(out_key, intermediate_value)

Reduce():

En esta función se combinan los valores intermedios agrupandolos por cada una de las claves que componen la colección y produce un set de valores combinados (generalmente uno).
reduce (out_key, list(intermediate_value)) -> list(out_value)

La siguiente imagen ejemplifica como es el proceso de transformación del modelo MapReduce:

Ejemplo de MapReduce tomada de http://research.google.com/archive/mapreduce-osdi04-slides/index-auto-0007.html

Una década después vivimos tiempos donde gracias a la Internet se manejan grandes volúmenes de información (Big Data) y se requiere del procesamiento de información en el menor tiempo posible y con la mayor confiabilidad. Ahora los sistemas no solo deben estar diseñados para procesar grandes lotes de información (batch), también existen casos donde la información fluye en la red (Streaming) y debe ser procesada en Tiempo Real (lidiando con la latencia y los retardos por los tiempos requeridos para el procesamiento de la información) con resultados que deben ser informados en el menor tiempo posible, ejemplo de estos nuevos escenarios son el conteo de las puntuaciones para juegos en linea o la cantidad de transacciones que se ejecutan en un sistema según la hora.

Como respuesta a las necesidades en el procesamiento de Big Data, Google a través de su plataforma en cloud ofrece un nuevo modelo como evolución del modelo MapReduce, su nombre es DataFlow este nuevo modelo fue publicado por Google en el paper del 2015, su framework tiene un SDK para java; se integra con los diferentes componentes que ofrece Google Cloud: Compute, Storage, Big Data y Services, permitiendo desarrollos enfocados en el procesamiento de grandes volúmenes de información ya sea en batch (Google Storage) o vía Streaming (Google Pub/Sub).

integración de Dataflow. tomada de http://www.itnews.com.au/news/google-puts-cloud-dataflow-into-public-beta-402841

Hello Word Dataflow:

El ejemplo básico utilizando el framework de DataFlow se puede encontrar aquí, en este artículo vamos a analizar dicho ejemplo, pero antes se debe tener en cuenta:

  1. Configurar el proyecto de gcloud
  2. Crear una regla en el firewall del proyecto que permita la conexión tcp por el puerto 12345, de lo contrario el ejemplo generará un error de tipo:

java.lang.RuntimeException: java.io.IOException: DEADLINE_EXCEEDED: (zmq) RPC timed out

para crear la regla se debe ingresar a Networking > Firewall rules > Create a firewall rule. Luego de crear la regla, el listado de las reglas en el firewall se deben enlistar junto a la regla creada:

Reglas del firewall para el proyecto en gloud. Fuente: el autor
  1. Instalar el Google Cloud SDK.
  2. Descargar el código fuente ejemplo para ser ejecutado con maven o con eclipse, para este caso se va a ejecutar el ejemplo con el plugin de Eclipse.

MinimalWordCount

Este ejemplo es el más simple de los que contiene el proyecto, donde se puede ver aplicados los elementos básicos de DataFlow: Pipelines, PCollections, ParDo, y lectura y escritura de archivos desde almacenamiento en Storage.

En la ruta src/main/java/com/google/cloud/dataflow/examples/ del proyecto se encuentra el archivo MinimalWordCount.java al cual si se quitan los comentarios debería verse de la siguiente manera:

Clase minimalWordCount. fuente: el autor.
  1. En la linea 77 se debe reemplazar SET_YOUR_PROJECT_ID_HERE por el id del proyecto el cual se puede ver en el home de la consola de desarrollo de Google.
  2. En la linea 78 se remplaza gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY por el nombre del bucket creado para almacenar los elementos requeridos por DataFlow para operar, en este caso se creo el bucket gdot-dataflow y una carpeta llamada staging por lo anterior el valor por el cual se realizo el remplazo fue gs://gdot-dataflow/staging.
Bucket para almacenar datos. fuente: el autor.

En la carpeta staging se almacenaran los archivos requeridos por DataFlow para ejecutar el programa, por el momento esta vacía, pero una vez se ejecute la clase MinimalWordCount esta carpeta contendrá los archivos requeridos por DataFlow como se puede ver en la siguiente imagen:

Carpeta staging del bucket gdot-dataflow. Fuente: el autor.
  1. En la linea 98 se debe reemplazar el valor de gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX por la dirección del bucket donde se espera que se escriban los resultados, para este caso se utilizo gs://gdot-dataflow/results/. como la carpeta results no fue creada dentro del bucket el programa automáticamente procede a crearla cuando se ejecute.

Etapas del programa MinimalWordCount

En la línea 79 se instancia el Pipeline con las opciones especificadas en las primeras lineas del programa:

Pipeline p = Pipeline.create(options);

Entre las diferentes opciones que puede contener un Pipeline en el momento ser instanciado tenemos el numworkers (cantidad de instancias de Compute Engine utilizadas para ejecutar el programa, por defecto son 3), network (red a la que serán enlazadas las máquinas de Compute Engine); la lista completa se encuentra aquí.

En la Línea 80 el programa utiliza el API de Text I/O para leer las lineas de todos los textos que se encuentran en la ruta gs://dataflow-samples/shakespeare/* y almacenarlos en una PCollection donde cada elemento es una línea de texto.

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))

Si queremos conocer los diferentes textos alojados en la ruta gs://dataflow-samples/shakespeare/* se debe acceder al cloud shell y ejecutar el comando:

gsutil ls gs://dataflow-samples/shakespeare/

Este comando lista los diferentes textos que se encuentran en dicha carpeta. Para ver uno de ellos debemos usar el comando cat:

gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt

Este comando lista los diferentes textos que se encuentran en dicha carpeta. Para ver uno de ellos debemos usar el comando cat:

gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt
Ejemplo del comando cat sobre el archivo kinglear.txt de la carpeta shakespeare. Fuente: el autor.

De la línea 81 a la 90 se aplica un Transformación ParDo a la colección de lineas que se obtuvieron en la primera etapa:

.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))

La transformación ParDo recibe como parámetro un objeto de tipo DoFn en el cual se debe especificar el tipo de elemento de entrada y el tipo de elemento de salida, en este caso DoFn<String, String> especifica que el elemento de entrada es un String (TextIO.Read produce una PCollection de tipo String) y DoFn<String, String>nos indica que la transformación produce un elemento de tipo String en su salida. En el método processElement se procede a separar en palabras el contenido de cada una de las lineas de la colección de entrada, a continuación se recorren cada una de las palabras y si el String no esta vació es enviado a la PCollection de salida mediante la sentencia c.output(word).

En la línea 91 se procede a agrupar todos los elementos repetidos haciendo un conteo de cada una de las coincidencias mediante la función Count:

.apply(Count.<String>perElement())

La función Count.<String>perElement() toma la PCollection de la etapa anterior y retorna otra PCollection donde cada key es una única palabra y el value es el cantidad de coincidencias en la PCollection original.

En la línea 118 a la 123 se realiza una nueva etapa en el flujo del Pipeline donde se le da formato a la PCollection:

.apply("FormatResults",MapElements.via(new SimpleFunction<KV<String,Long>, String>() {
private static final long serialVersionUID = 0;
@Override
public String apply(KV<string, long=""> element) {
return element.getKey() + ": " + element().getValue();
}
}))

La función apply de la clase SimpleFunction recibe cada uno de los elementos KV (par inmutable de Key/value) de la PCollection producto de la etapa anterior y devuelve un String con formato “Key : Value” el cual se agrega a la nueva PCollection.

En la línea 98 se procede a almacenar los resultados de la etapa anterior (La PCollection de Strings formateados):

.apply(TextIO.Write.to("gs://gdot-dataflow/results/"));

Dataflow produce una cantidad de archivos de salida, es decir que la colección final no estará alojada en un único archivo de salida, esto se debe a la naturaleza del procesamiento en paralelo de la transformación ParDo.

Para producir un único archivo se debe hacer una nueva etapa donde se combinen los archivos producidos por el Pipeline.

La línea 99 es donde se le indica al programa que se debe ejecutar el Pipeline:

p.run();

Cuando se ejecuta la clase MinimalWordCount la consola de eclipse mostrara una traza similar a la que se encuentra a continuación:

Traza en eclipse clase MinimalWordCount. Fuente: el autor.

Desde la consola de Google, en el apartado de Compute Engine se puede ver como DataFlow se encarga de crear y lanzar las instancias necesarias para hacer la ejecución del programa según la cantidad configurada en las opciones de creación del Pipeline, es decir que el procesamiento en paralelo se esta realizando en las tres maquinas. Una vez terminada la ejecución DataFlow se encarga de detener y eliminar las instancias utilizadas para hacer el procesamiento de la información.

Instancias Compute Engine ejemplo Dataflow. Fuente: El autor.

En el apartado de Cloud Dataflow de la consola de desarrollo de Google se pueden ver la lista de los Jobs ejecutados para el proyecto (cada Job representa una ejecución de DataFlow), al acceder al job ejecutado se puede ver la información que aparece a continuación:

Flujo del Pipeline para el programa minimalwordcount. Fuente: el autor.

A la izquierda en verde estarán cada una de las etapas de y a la derecha se encuentra la información relevante sobre el proceso; Summary indica los datos relacionados al Job ejecutado como el Id del job, el tiempo que duro el flujo en Dataflow (dos minutos y quince segundos), si se presentaron errores o alertas y el tipo de Job (Batch o Streaming); Job Log permite ver el log de las actividades realizadas y Step informa sobre los elementos agregados en cada una de las fases de Dataflow.

En la siguiente tabla se puede ver como a través de cada Step del dataflow se modifica la cantidad de elementos pasando de tener 172,948 lineas de texto extraídas de los textos de shakespeare a 945,845 cuando se separa cada línea en palabras y posteriormente se agrupan en una PCollection de 32,786 elementos.

[supsystic-tables id=”1″]

En este ejemplo básico se han extraído y transformado las 172,948 lineas de texto en una colección de 32,786 almacenada en varios archivos dentro de la ubicación gs://gdot-dataflow/results/

A continuación se puede observar la lista de los archivos generados como resultado del proceso de DataFlow:

Resultados del programa almacenados en la ruta gs://gdot-dataflow/results/. Fuente: el autor.

Cada archivo contiene entradas de la forma:

decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6

Y eso es todo lo referente al ejemplo básico con DataFlow en el cual en dos minutos se logro extraer y transformar más de 945 mil palabras para proceder a ser almacenadas, por lo anterior el autor considera que Dataflow es Famework con mucho futuro en la arena de Cloud Computing.

¡Hasta la próxima!

Referencias:

[1] MapReduce: Simplified Data Processing on Large Clusters http://research.google.com/archive/mapreduce.html
[2] The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf.
[3] What is Google Cloud Dataflow? https://cloud.google.com/dataflow/what-is-google-cloud-dataflow

--

--