Découvrir le cloud avec AWS en développant un data pipeline — Partie 4

Eren GUNDAG
Publicis Sapient France
8 min readMar 8, 2023

Dans la précédente partie de ce tutoriel, nous avons développé ce qui constitue le cœur de notre data pipeline, le job de transformation et d’ingestion de données. Nous avons également découvert une fonctionnalité intéressante d’AWS Glue qui nous permet d’analyser nos fichiers de données afin d’en générer des tables, exposées via un catalogue de données.

Il est désormais temps d’orchestrer tout cela. Nous utiliserons pour cela AWS Step Functions, basé autour de définitions de machines à états qui vont nous permettre d’organiser et monitorer nos workflows. Nous utiliserons également notre fonction Lambda, développée dans la deuxième partie de ce tutoriel, que nous modifierons légèrement, afin de déclencher notre Step Function.

AWS Step Functions

AWS Step Functions est un orchestrateur de fonctions sans serveur qui permet de séquencer facilement les différents services AWS (fonctions Lambda, jobs Glue…). Vous pouvez créer et exécuter une série de flux de travail avec des points de contrôle qui gèrent l’état de l’application. Le résultat d’une étape sert de base à la suivante. Chaque étape de votre application s’exécute dans l’ordre et comme prévu par la logique métier que vous avez définie. Vous pouvez définir des boucles, étapes d’attente, conditions. Une machine à état qui tire parti des avantages du serverless.

1. Commençons tout d’abord par créer une lambda qui déclenchera notre crawler après l’exécution de notre job Glue. Vous le savez, nous aurons besoin d’un Role pour cela, rendez-vous donc directement sur la page du service IAM. Cliquez sur Create Role. Sélectionnez Lambda. Puis Next.

2. Sélectionnez la policy AWSLambdaBasicExecutionRole.

3. Créez ensuite une policy start-glue-crawler et insérez le contenu ci-dessous.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "StartGlueCrawler",
"Effect": "Allow",
"Action": "glue:StartCrawler",
"Resource": "*"
}
]
}

Cliquez sur Review policy.

4. Ajoutez la policy au Role puis cliquez sur Suivant.

5. Taggez si vous le souhaitez et cliquez sur Suivant.

6. Nommez, décrivez et créez le Role.

7. Rendez-vous sur la page du service Lambda. Cliquez sur Create function.
Nommez-la start-crawler, sélectionnez Python 3.8 en Runtime puis le Role fraîchement créé et cliquez sur Create function.

8. Insérez le code suivant dans l’éditeur et cliquez sur Deploy.

import boto3

def lambda_handler(event, context):

client = boto3.client('glue')

client.start_crawler(Name=event['crawler-name'])

9. Rendez-vous sur la page du service Step Functions et cliquez sur Create state machine.

10. Sélectionnez Author with code snippets puis Type Standard.

11. Passons à la définition de l’orchestrateur. Remplacez d’abord le contenu de l’éditeur par :

{
"Comment": "Convert to parquet and crawl data",
"StartAt": "Start Glue Job",
"States": {
}
}

12. Le service vous mâche un peu le travail avec un générateur d’extrait de code. Cliquez sur Generate code snippet et recherchez AWS Glue: Start a job.

13. Sélectionnez votre job Glue. Cela génère le pattern Json nécessaire au déclenchement du job Glue sélectionné et cliquez sur Copy to clipboard.

14. Collez le contenu copié dans l’éditeur de définition, à la place de l’étape Hello.
Ajoutez l’élément suivant à la liste des paramètres de notre job Glue.

  "Arguments": {
"--table.$": "$.table"
}

Notre prochaine étape sera le déclenchement du crawler, vous pouvez déjà remplacer NEXT_STATE par Start Crawler.

15. Cliquez sur Generate code snippet. Sélectionnez cette fois AWS Lambda: Invoke a function.

16. Sélectionnez la fonction start-crawler. Copy to clipboard.

17. Ajoutez cette étape à la suite de l’étape Start Glue Job, n’oubliez pas la virgule.
Nommez l’étape Start Crawler et remplacez

"Input.$": "$"

par

"crawler-name": "parquet-datalake"

puis

"Next": "NEXT_STATE"

par

"End": true

18. Le bouton Format Json permet d’indenter le code JSON comme il se doit. La définition de la Step Function devrait ressembler à ça :

{
"Comment": "Convert to parquet and crawl data",
"StartAt": "Start Glue Job",
"States": {
"Start Glue Job": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "transform_to_parquet",
"Arguments": {
"--table.$": "$.table"
}
},
"Next": "Start Crawler"
},
"Start Crawler": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:::function:start-crawler:$LATEST",
"Payload": {
"crawler-name": "parquet-datalake"
}
},
"End": true
}
}
}

Notez que l’éditeur propose également un schéma qui permet de visualiser les différentes étapes de notre Step Function.

19. Cliquez sur Suivant.

20. Nommez la convert-crawl-data. Vous pouvez créer un Role et le sélectionner ou laisser là aussi le service le faire pour vous, en déduisant ce qui est nécessaire. Vous pouvez d’ailleurs passer en revue le Role créé en fin de page si vous le souhaitez. Taggez la Step Function si vous le souhaitez.

21. Cliquez sur Create state machine.

22. Nous allons nous appuyer sur la Lambda decompress pour déclencher notre Step Function. Mais pour ça, il va falloir lui en donner le droit. Rendez-vous sur le service IAM afin d’éditer le Role lambda-s3-full-access-staging-datalake.

23. Cliquez sur Attach Policies puis Create Policy.

24. Insérez-y la policy suivante, en remplaçant l’ARN (l’identifiant unique) par l’ARN de votre Step Function.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"states:StartExecution"
],
"Resource": [
"arn:aws:states:[[region]]:[[accountId]]:stateMachine:[[stateMachineName]]"
]
}
]
}

25. Nommez, décrivez et créez la policy.

26. Retournez sur la page d’édition du Role et ajoutez-y la policy fraichement créée.

27. Cliquez sur Attach policy. Le Role devrait à présent ressembler à ça :

28. Rendez-vous sur la page du service Lambda pour y éditer la Lambda decompress.

29. Remplacez le code par celui-ci, en pensant à remplacer par le nom de votre bucket de datalake, votre AccountID ainsi que le RegionID où cela est mentionné. Puis Deploy.

import boto3
import os
from zipfile import ZipFile
import json

def lambda_handler(event, context):

s3 = boto3.resource('s3')
stepfunction = boto3.client('stepfunctions')

try:
src_bucket = event['Records'][0]['s3']['bucket']['name']
src_key = event['Records'][0]['s3']['object']['key']
filename = os.path.basename(src_key)
table_name = os.path.splitext(filename)[0]
target_bucket = ''
files_list = []

s3.meta.client.download_file(src_bucket, src_key, f'/tmp/{filename}')

with ZipFile(f'/tmp/{filename}', 'r') as zipObj:
zipObj.extractall('/tmp/unzipped')


for file in os.listdir('/tmp/unzipped'):
s3.meta.client.upload_file(f'/tmp/unzipped/{file}', target_bucket, f'raw/{table_name}/{file}')
files_list.append(file)

stepfunction.start_execution(
stateMachineArn='arn:aws:states:::stateMachine:convert-crawl-data',
input=json.dumps({"table" : table_name})
)

except Exception as e:
s3.meta.client.copy({'Bucket': src_bucket, 'Key': src_key}, src_bucket, f'out/{filename}')
finally:
s3.Object(src_bucket, src_key).delete()

return files_list

Notre pipeline est désormais prêt. Nous avons déjà ingéré un fichier de candidats, au fur et à mesure de nos développements.
Nous allons expérimenter notre pipeline de A à Z, du dépôt dans le bucket de staging à la consultation des données.

1. Téléchargez le fichier de données en cliquant ici, il s’agit cette fois d’une liste de jobs.

2. Retournez sur la page du service S3 et cliquez sur le bucket de staging.

3. Cliquez sur le préfixe in/ précédemment créé.

4. Cliquez sur Upload puis Add files. Ajoutez le fichier jobs téléchargé puis cliquez sur Upload.

5. Cela va déclencher notre fonction Lambda decompress qui va donc décompresser nos données et les copier dans le datalake, sous le préfixe raw/.

Et déclencher notre Step Function.

Qui va déclencher notre job Glue.

Afin de convertir nos données en parquet.

Puis lancer le crawler.

Qui va créer nos tables, déduites des données de notre datalake.

Note : Cette étape peut durer quelques secondes voire minutes, la Step Function ne fait que de lancer le crawler et n’attend pas son exécution complète. Il pourrait être intéressant d’exploiter les fonctionnalités du service Step Function afin d’attendre la fin de l’exécution du crawler et considérer le flux comme terminé avec succès et/ou de gérer les cas d’erreurs s’il y en a pour approfondir votre apprentissage.

Notre data pipeline est fin prêt. Dans la prochaine et dernière partie de ce tutoriel, nous récolterons les fruits de notre travail en découvrant Athena, service de requête distribué qui nous permettra d’effectuer des requêtes SQL-like sur nos données transformées.

--

--