Construyendo un flujo de datos en tiempo real con Kinesis Data Streams, Lambda, DynamoDB y SNS
Hoy en día, la generación de los datos crece de una manera abismal, las empresas para ser competitivas no solo deben considerar la información que almacenan en sus bases de datos tradicionales (SQL Server, Oracle, MySQL, etc), en muchos casos deben obtener más datos para brindar una experiencia única a sus clientes, ya sea creando un producto personalizado, mejorando sus servicios, recomendando alguna oferta, etc.
Cuando estos datos almacenados en sistemas tradicionales no pueden responder a la demanda de usuarios, tenemos un problema de volumen; si a su vez tenemos diversas fuentes de datos que no sabemos como almacenarlas, tenemos un problema de variedad; y si todavía los datos se crean a velocidades nunca estimadas, tenemos un problema de velocidad.
El Big Data se hace popular por el concepto de las 5 primeras V’s: velocidad, variedad, volumen, veracidad y valor.
¿Cuántos beneficios se podrían obtener si es que se tuvieran flujos de datos que puedan responder automáticamente a eventos?
Hace algunas semanas construí un pequeño flujo de datos en tiempo real usando los servicios de AWS : Kinesis Data Streams, EC2, Lambda, DynamoDB y SNS.
Resumo de manera general esta arquitectura que se puede usar en un escenario productivo.
Primero lo que realicé es a partir de una EC2 enviar datos desde un archivo python hacia Kinesis Data Streams, estos datos son unos productos que se generan de manera aleatoria.
Luego Kinesis Data Streams lo agrego como desencadenador a una función Lambda para que pueda leer los datos del stream y pueda persistirlos en una tabla en DynamoDB, finalmente si hay algún producto con una descripción específica envío un correo electrónico informativo a un usuario.
Este flujo de datos, lo podría usar si deseo obtener los valores de miles de sensores IoT que por ejemplo estén monitoreando el funcionamiento de una máquina, o también monitoreando el nivel de humedad del suelo en un campo de cultivo (en un próximo post capturaré los datos de un sensor a través de una Raspberry y mediante AWS IoT lo publicaré en Kinesis Data Streams); este input reemplazaría a la EC2 que tiene un python que genera datos y los agrega a Kinesis Data Streams.
Se me ocurre otro caso de uso para este pequeño flujo de datos, podríamos también detectar en tiempo real si una transacción es fraudulenta o no, recibiría como parámetros al cliente y el detalle sobre la transacción que está realizando, se tomarían estos parámetros y se podrían predecir en tiempo real si fue o no una transacción fraudulenta.
Bueno ahora sí, empecemos a construir este flujo de datos.
Si son nuevos en AWS, les recomiendo que se creen una cuenta, AWS les brindarán el uso de algunos servicios de manera gratuita.
Por ejemplo podrán usar una EC2 (t2.micro) con Linux y con EBS hasta 30GB, encedida durante un año, sin pagar ni un dólar.
Pueden entrar a este link y verán todos los servicios que cubre la capa gratuita.
Para realizar este flujo de datos, se gastará menos de un dólar.
Se deben descargar el repositorio de git, para realizar el flujo de datos.
https://github.com/luisgradossalinas/aws-bigdata-real-time
Usaremos servicios como Kinesis Data Streams y DynamoDB que no están dentro de la capa gratuita.
- Creamos un rol en IAM
Una vez que hemos accedido a nuestra cuenta de AWS mediante la consola, nos vamos al servicio de IAM, nos dirigimos a la opción Roles y clic en Crear un Rol.
Seleccionamos EC2, ya que el rol será asociado a la instancia.
Buscamos la política KinesisFullAccess y la marcamos, esta política hará que desde la EC2 podamos ejecutar un python que envíe datos a Kinesis Data Streams.
Clic en siguiente
Como nombre de rol le ponemos RoleEC2Kinesis, clic en Crear un rol
2. Crear una EC2
Entramos al servicio EC2 , clic en Launch Instance.
Nos aparece una lista de AMI, en este caso seleccionamos Amazon Linux 2 AMI de 64 bit (x86).
Usaremos la EC2 de la familia t2.micro, ya que está considerada en el nivel free, clic en Next : Configure Instance Details.
Solo creamos una EC2, dejamos todas las opciones por defecto, solo en la opción IAM role seleccionamos el rol que hemos creado previamente: RoleEC2Kinesis, clic en Next : Add Storage.
No necesitaremos mucho espacio en el disco, así que lo dejamos por defecto, clic en Next : Add Tags.
Agregaremos una etiqueta para identificar nuestra EC2.
Add another tag: en Key agregamos Name y en Value el valor descriptivo que se desea (Usado para identificar nuestra EC2).
Se creará un grupo de seguridad, donde indicaremos el nombre del grupo y la regla de entrada para que solo nuestra IP pueda acceder a la EC2 (por temas de seguridad).
Clic en Review and Launch.
Aparecerá un resumen de la EC2, clic en Launch.
Aparecerá una ventana, seleccionamos Create a new key pair y le asignamos un nombre EC2-BigData.
Clic en Download Key Pair, clic en Launch Instance.
La EC2 ya se está lanzando, esperemos unos segundos hasta que esté lista para usar.
Clic en View Instances.
3. Generación de clave privada (realiza este paso solo si usas Windows)
Abrimos el PuttyGen para generar la clave privada.
En caso no se tenga instalado, aquí agrego la url de descarga.
https://www.puttygen.com/download.php?val=4
Clic en load y cargamos el archivo .pem luego clic en Save private key.
4. Creación de sesión en MobaXterm (realiza este paso solo si usas Windows)
Usaremos el MobaXterm para conectarnos a la EC2, si todavía no lo has descargado, envío la url de descarga.
https://download.mobatek.net/1242019111120613/MobaXterm_Installer_v12.4.zip
Antes de abrir el MobaXterm, vayamos a la consola de AWS y busquemos la EC2, seleccionamos la EC2, y clic en Connect aparecerá la siguiente ventana, copiamos lo resaltado en amarillo.
Abrimos el MobaXterm, clic en Sesion -> SSH
En el campo Remote Host: Pegamos lo que copiamos en el paso anterior
Luego, clic en la pestaña Advanced SSH settings, y clic en Use private key -> Seleccionamos el archivo ppk generado previamente y clic en Ok.
Aparecerá la siguiente ventana, que muestra el acceso a la línea de comandos de la EC2.
Subimos el archivo python WriteKinesisStreamsCompra.py a EC2.
En el python que enviará la data a Kinesis Data Streams, usaremos la librería boto3 para acceder a Kinesis y pytz para cambiar la zona horaria a Lima.
Ejecutar lo siguiente en el terminal de linux:
sudo yum install python3 -y
pip3 install boto3 — user (ten cuidado al copiar es doble guión)
pip3 install pytz — user
5. Creación de streams en Kinesis Data Streams
Entramos a la consola de AWS y entramos al servicio de Kinesis.
Amazon Kinesis Data Streams (KDS) es un servicio de streaming de datos en tiempo real con un alto nivel de escalabilidad y durabilidad.
Kinesis Data Streams permite procesar big data de streaming en tiempo real.
Kinesis Data Streams ingiere una gran cantidad de datos en tiempo real, los almacena de forma duradera y los pone a disposición para el consumo.
Es totalmente administrado, es una alternativa a Apache Kafka
Excelente para logs de aplicaciones, métricas del servidor, IoT, secuencia de clics, soluciones Big Data.
Se integra con Spark y Nifi como consumidores.
La data se replican en 3 zonas de disponibilidad.
Clic en Data Streams y Create Data Streams.
El nombre del stream será ComprasStream.
En el campo Number of shards ingresamos 1, clic en Create Kinesis stream.
Como solo vamos a enviar datos que pesan menos de 1MB no es necesario agregar varios shards, en escenarios productivos cuando tengamos necesidad de enviar GB de datos a Kinesis Data Streams, mediante una solicitud a AWS podemos pedir que nos permitan generar más shards.
AWS nos cobra por cada shard aprovisionado por hora.
Se visualiza el stream ComprasStream creado.
6. Creación de función Lambda
AWS Lambda le permite ejecutar código sin aprovisionar ni administrar servidores. Paga solo por el tiempo de cómputo que consume.
Clic en Crear una función
El nombre de la función será (opcional): ReadKinesisIngestDynamo
El lenguaje de programación será Python 3.7, clic en Crear una función.
Aparece la función creada.
7. Creación de tabla en DynamoDB
Amazon DynamoDB es una base de datos de NoSQL y de documentos que ofrece rendimiento en milisegundos de un solo dígito a cualquier escala.
DynamoDB escala automáticamente de acuerdo a la demana de read y write que requiera.
Los datos se almacenan en discos SSD y se replican en 3 zonas de disponibilidad.
Ofrece cifrado de datos en reposo.
DynamoDB puede gestionar más de 10 billones de solicitudes por día y puede admitir picos de más de 20 millones de solicitudes por segundo.
Ir al servicio de DynamoDB, clic en Crear tabla.
En el nombre de tabla agregar : compra
Clave principal : id_compra
Desmarcar la opción de Configuración de la tabla
Modo de capacidad de lectura/escritura : Bajo demanda (Autoescalable)
Al seleccionar esta opción, nos aseguramos que DynamoDB se encargue de aprovisionar nuestra tabla, así vengan miles de solicitudes de lectura o escritura, estará preparada para responder adecuadamente.
Se crea la tabla compra.
8. Creación de tópico en SNS y suscriptores
Ir al servicio de SNS, agregar el nombre : crear el tópico snsCompras.
Se crea el nuevo tópico en SNS.
Copiamos el arn del SNS y lo pegamos en un notepad, ya que más adelante lo tendremos que usar.
Crear una suscripción al tópico de SNS.
Clic en Crear una suscripción.
Seleccionamos como protocolo: Correo electrónico y en un punto de enlace agregamos nuestro correo para que nos llegue notificaciones del servicio de SNS.
Aparecerá el siguiente mensaje:
Nos llegará un correo para confirmar la suscripción al tópico.
Y procedemos a confirmar:
Volvemos a SNS para verificar que la suscripción esté confirmada.
El mensaje debe ser confirmado para recibir mensajes por AWS SNS.
9. Actualización de la función lambda
Actualizar el rol, entrar a la función lambda ReadKinesisIngestDynamo
Clic en la parte resaltada.
Clic en la opción de expandir.
Clic en Editar la política.
Clic en la opción de JSON.
La función Lambda solo debe tener los permisos necesarios.
Para generar logs en CloudWatch.
Para obtener los datos de Kinesis Data Streams.
Para almacenar los datos en DynamoDB.
Para enviar mensajes por SNS.
Abrir el archivo policy.json, copiar y pegar el contenido en la pestaña JSON.
Clic en Revisar la política y Guardar los cambios.
Agregar el desencadenador de Kinesis y seleccionamos el flujo ComprasStream.
Clic en Agregar.
Abrir el archivo CodeLambda.py, copiar y pegar el contenido del código en la función Lambda.
En la línea 33 cambiar el arn, por el arn de nuestro tópico en SNS.
El código de la función Lambda quedaría así.
10. Prueba final
Una vez que hemos creado y configurado todos los servicios para el flujo de datos en tiempo real, procedemos a probarlo.
Ejecutar el archivo Python en la EC2.
python3 WriteKinesisStreamsCompra.py
Verificar que se hayan ingestado los datos a DynamoDB y si se tiene dentro de la compra una Raspberry pi 4B debe llegar un mensaje de correo.
IMPORTANTE: Al finalizar la prueba del flujo de datos, eliminar el stream de Kinesis Data Streams y la tabla purchase en DynamoDB, esto es para que AWS no nos cobre.
La EC2 la puede usar de manera gratuita durante un año si es que está dentro de la capa gratuita.
SNS y Lambda mientras no se usen AWS no realizará cobros adicionales.
Todo este flujo de datos lo puedes tener en cuestión de pocas horas usando los servicios de AWS.
En un próximo post publicaré algo más real, y capturaré datos desde un sensor con una Raspberry Pi 3B+, usando el servicio de AWS IoT.
Si te pareció interesante este post, puedes compartirlo.
#aws #bigdata #realtime