Captura y Almacena Tweets en Real Time con Spark Streaming y Apache Kafka . Usando la nube de Databricks y GCP (Parte 1)
Hola a todos, en esta oportunidad me gustaría compartirles un ejemplo de como capturar y almacenar información de Twitter en tiempo real con herramientas open source Spark Streaming, Apache Kafka usando plataformas Cloud como Databricks y GCP.
Esta vez usaremos Spark para conectarnos a Twitter y mediante un Producer API llevaremos los datos a un tópico de Kafka.
Antes de iniciar con el desarrollo, definiré algunos conceptos importantes:
Spark Streaming: Es una extensión de la API core de Apache Spark, que da respuesta al procesamiento de datos en tiempo real (micro batch) de forma escalable. Spark Streaming puede conectarse con diferentes herramientas tales como: Apache Kafka, Flume, Amazon Kinesis, Twitter, sensores IOT, etc.
Apache Kafka: Es un sistema de mensajería publicación-suscripción rápido, escalable, duradero y tolerante a fallos . Kafka es generalmente utilizado en arquitecturas real-time que utilizan Stream de datos, para proporcionar análisis en tiempo real.
¡¡Empecemos !!
- Twitter APP: Para obtener tweets, debemos registrarnos en TwitterDevs y crear una aplicación. Con esto tendremos 4 valores importantes que nos permitirán consumir la información:
- Consumer Key (API Key)
- Consumer Secret(API Secret)
- Access Token
- Access Token Secret
2. Spark Databricks: Usaremos la plataforma cloud de Databricks que nos permite crear un cluster de Spark-Scala gratis. Debemos registrarnos en SparkDatabricks, luego del registro creamos un Notebooks Scala donde trabajaremos nuestro código.
Antes a escribir nuestro código debemos crear nuestro cluster Spark Scala e importar dos librerías. TwitterLibrary nos permitirá usar el API de Twitter con Spark y KafkaLibrary la cual nos ayuda a conectarnos con Apache Kafka.
Aquí la documentación oficial de como importar librerías usando Maven: https://docs.databricks.com/user-guide/libraries.html
Creamos el cluster:
Para importar las librerías primero debemos seleccionar la opción: Import Library
Luego seleccionar que importaremos las librerías desde Maven Central y en Coordinate escribimos el nombre de la librería: spark-streaming-kafka-0–10_2.11 y luego importar.
debemos hacer lo mismo con: spark-streaming-twitter_2.11
¡¡Listo!!, ahora solo nos falta habilitar nuestro clúster de Kafka para empezar con la ejecución de nuestro desarrollo.
3. Apache Kafka en Google Cloud Platform: Para iniciar con la habilitación del cluster debemos realizar los siguientes pasos:
a) Crear una instancia en GCP e instalar Kafka, seguir el siguiente tutorial Kafka-Cloud
b) Habilitar y crear reglas para exponer los puertos: 2181 (zookeeper) y 9092 (kafka). Desde la consola SSH de nuestra virtual machine ingresar los siguientes comandos: (recuerda cambiar la variable NOMBRE_VM)
gcloud compute firewall-rules create rule-allow-tcp-9092 --source-ranges 0.0.0.0/0 --target-tags allow-tcp-9092 --allow tcp:9092gcloud compute firewall-rules create rule-allow-tcp-2181 --source-ranges 0.0.0.0/0 --target-tags allow-tcp-2181 --allow tcp:2181gcloud compute instances add-tags NOMBRE_VM --tags allow-tcp-2181
gcloud compute instances add-tags NOMBRE_VM --tags allow-tcp-9092
c) Configurar el archivo properties de Kafka para exponer los servicios, agregar los siguientes parámetros en el siguiente archivo de configuración server.properties, desde el ssh:
vi /usr/local/kafka/config/server.properties
listeners=PLAINTEXT://tu_ip_interna_vm:9092
advertised.listeners=PLAINTEXT://tu_ip_pública_vm:9092
d) Por último, reiniciar el servicio de Kafka para que tome los cambios realizados.
sudo /usr/local/kafka/bin/kafka-server-stop.shsudo /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
Ahora ya tenemos todo listo para consumir datos de Twitter y almacenarlos en un tópico de Apache Kafka. ¡¡Vamos !!
En nuestro notebook ingresamos los siguientes códigos:
Si deseas tener el notebook completo: jmcode
Para observar los mensajes que se escriben dentro de nuestro tópico, debemos ejecutar lo siguiente en nuestro ambiente GCP por SSH:
sudo /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server ip_interna_vm:9092 --topic llamada
Dentro de Databricks podemos hacer seguimientos a los tweets que vamos procesando:
Eso es todo por ahora, en la 2da parte veremos como hacer alguna transformación de esta información en RT y almacenar los resultados en una base de datos como apache HBase, Cassandra o Apache Kudu.
Nos vemos en la próxima publicación :) ….