BigData Pipepline : Serverless et 100% scalable grâce à AWS Firehose, Lambda et S3

100% scalable data pipeline

Pour effectuer vos jobs Hadoop ou Spark, vous avez idéalement besoin de votre donnée organisée, enrichie et disponible dans un système de fichier distribué HDFS.

Je vais appeler cet état le « COD checkpoint » pour « clean & organized data checkpoint ».

Nous allons voir comment arriver à ce fameux COD checkpoint sans effort, (presque) sans coder, avec un résultat 100% scalable.

À partir de là, vous serez prêts à lancer vos jobs Spark ou Hadoop ou tout autre batch de data processing.

C’est ce que nous avons fait chez Stormize !

“Scalable” le mot sexy qui ne coutent plus qu’un clic.

Monitoring de votre stream Firehose

Firehose est un système de queue de message, à l’image d’Apache Kafka.

Le principe de AWS Firehose est simplissime : vous définissez une source de données, un buffer, une fonction de transformation de la data (ou non), et une destination.

1 — Source :

AWS propose deux types d’entrées possibles pour Firehose : « Direct PUT » (celui qui va nous intéresser ici) et Kinesis stream (cool, si vous avez besoin de real time processing avant l’archivage, par exemple dans le cas de modèles de deep-learning).

2 — Buffer :

Combien de temps et quel poids d’enregistrements vous voulez garder au maximum (ou au minimum) dans la queue avant de déverser son contenu vers sa destination.

  • Le poids total est configurable entre 1 et 128MB.
  • Le temps, entre 60 et 900 secondes.

3 — Transformation :

Lorsqu’une donnée entre dans votre tuyau, vous avez la possibilité de la manipuler grâce à une fonction AWS Lambda. La « transformation de la donnée » est une fonction pratique, mais on va surtout utiliser cette étape « gratuite en dette technique » pour effectuer la validation de nos enregistrements !

4 — Destination :

À la sortie de Firehose, Amazon propose d’envoyer vos enregistrements dans :

  • S3
  • Redshift (BDD orientée colonne et basée sur PostgreSQL 8)
  • Elasticsearch.

Firehose est la clef de voûte de notre infrastructure Serverless. Nous allons déverser toutes nos données dans AWS Firehose, sans nous soucier de rien. Car c’est la magie des service Amazon :

AWS s’occupe du scaling pour vous.

Créer et alimenter votre stream Firehose

Firehost data pipline

La création du stream Firehose est très simple, je ne vais pas en détailler les étapes ici.

Pour notre exemple, voici les paramètres importants :

  • Source : Direct PUT
  • Destination : Bucket S3
  • S3 Backup : Enabled (important)
  • Buffer size : 128MB (MAX)
  • Buffer interval : 900 secondes (MAX)
  • S3 Compression : GZIP

Pour l’alimentation du stream, on va imaginer un scénario assez simple :

Je suis une startup B2C avec un produit disponible en Webapp, App iOS Mobile/Tablet et Android Tablet.
Je voudrais logger les actions de mes utilisateurs au sein de mes applications, en faire des stats et détecter des anomalies.

Sur chaque plateforme, on va ajouter un Kinesis producer. Pour des raisons de performance, je vous recommande d’utiliser la méthode « putRecordBatch » qui envoie plusieurs logs dans la même requête. Attention la limite est de 500 records par appel.

Voici un exemple de code :

function sendRecords( data ) {

recordsBuffer.push( { Data: JSON.stringify(data) } );

if( recordsBuffer.length == 500 ) {

var recordParams = {
DeliveryStreamName: 'my_beauty_stream',
Records: recordsBuffer
};

firehose.putRecordBatch(recordParams, function (err, data) {
if (err) console.log(err, err.stack);
});

recordsBuffer = [];
}
}

Transformer et surtout valider votre donnée

Firehose nous offre la possibilité de passer tous nos enregistrements dans une fonction (AWS) Lambda afin de les manipuler. L’idée ici est de dégrossir la donnée entrante.

Je vous déconseille d’effectuer l’enrichissement de la donnée ici, ou de faire appel à quelque service externe qu’il soit.

Votre stream Firehose est votre mine de diamant, vous voulez d’abord extraire les pierres de la mine, avant de les nettoyer, de les découper et de les vendre 10000 fois le prix. La logique est la même ici : cette zone du pipeline doit être la plus légère/rapide possible et la plus résiliente à la panne. Ne la surchargez pas.

Cela étant, grâce à la fonction backup to S3 de Firehose on peut se permettre de faire un peu de validation (quitte à rejouer les enregistrements plus tard depuis le backup).

En effet, les enregistrements entrant dans Firehose seront tous stockés tels quels dans le bucket de backup S3 que vous aurez sélectionné.

Pour faire notre validation, Firehose invoque une fonction AWS Lambda à laquelle il envoie un petit set d’enregistrements.

Le scaling est encore une fois géré par AWS.

Dans notre exemple, on va imaginer qu’on ne garde que les logs de la dernière version de notre app et que l’on uniformise un peu. Cela donnerait ça :

Transformer et valider les enregistrement dans Firehose avec AWS Lambda

Une fois la donnée valide renvoyée dans Firehose, celui-ci ça se charger de la pousser sous forme de petits fichiers dans S3.

Le chemin sera s3://mon-beau-bucket/YYYY/MM/DD/HH/file-random.gz ce qui n’est pas personnalisable…

Les limites du système et comment y remédier

Firehose a beaucoup de qualités et nous a fait gagner énormément de temps et d’argent pour la mise en place de notre pipeline big data.

En revanche, il a deux inconvénients majeurs pour notre usage.

1 — Il crée plein de petits fichiers

Firehose ne sait pas aggréger les fichiers

Plus le nombre d’enregistrements entrant dans votre stream Firehose sera grand, plus vous aurez de fichiers dans S3.

Donc, plus de fichiers à lire par votre cluster Spark. Et ça, ce n’est pas bon pour la performance de vos jobs.

2 — Le chemin des fichiers n’est pas personnalisable

Dans notre exemple, nous avons 4 sources de données (Webapp, App iOS Mobile/Tablet et Android Tablet), dans une grande majorité des cas nous voudrons analyser une ou deux sources, par exemple les actions des utilisateurs iOS uniquement.

Dans ce genre de cas de figure, des dossiers par source nous permettraient de facilement cibler les données qui nous intéressent.

Par exemple, un chemin comme cela : s3://mon-beau-bucket/ios-mobile/YYYY/MM/DD/HH/file-random.gz

Ou à la limite : s3://mon-beau-bucket/YYYY/MM/DD/HH/ios-mobile-random.gz

La magie s’arrête là, ou presque

Pour pallier à ces lacunes de Firehose, il y a beaucoup d’options. Je crois qu’à partir d’ici chacun trouvera l’option la plus adaptée à ses besoins.

À défaut d’une solution parfaite, qui est à l’étude, nous avons adopté la solution suivante : Une nouvelle Lambda !

Hé oui, rien de plus simple et scalable qu’une bonne Lambda.

Le fonctionnement est relativement simple, chaque fois que Firehose dépose un nouveau fichier, S3 déclenche notre Lambda. Celle-ci va lire, répartir et agréger les données contenues dans le fichier Firehose vers des fichiers plus conséquents, découpés par source dans un autre bucket S3.

On peut également imaginer faire auiss un peu de ménage. On a pas besoin de garder la donnée en triple…

Cela vous donnera peut-être d’autres idées, par exemple, s’en servir pour insérer dans HBase…

Le fameux Checkpoint COD

Comme diraient nos amis anglophone, « Et voilà ! »

Nous avons nos fichiers de logs dans un système HDFS (entièrement géré par S3), 100% scalable et bien intégré à l’univers Spark / Hadoop avec les librairies qui vont bien.

Notre infrastructure se compose de ~50 lignes de code pour nos deux Lambda et c’est à peu près tout ce que nous devons maintenir.

À partir d’ici, vous n’avez plus qu’à faire chauffer les algorithmes de map reduce pour tirer toute la quintessence des données collectées.

Like what you read? Give Machavia Pichet a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.