Découvrir le cloud avec AWS en développant un data pipeline — Partie 3

Eren GUNDAG
Publicis Sapient France
7 min readMar 8, 2023

Dans la précédente partie de ce tutoriel, nous avons construit la première étape de notre data pipeline : son déclenchement à l’arrivée des fichiers de données, ainsi qu’un prétraitement des données. Cette étape a été l’occasion pour nous de découvrir AWS Lambda qui nous permet d’exécuter notre code sans se soucier de l’infrastructure à déployer ou de sa maintenance. C’est l’un des services d’AWS qui illustre très bien les avantages que peut apporter le cloud computing à nos projets. Nous avons également fait un passage par le service essentiel qu’est IAM pour la gestion des autorisations.

Dans cette partie, nous travaillerons sur la brique centrale de notre data pipeline, le job de transformation de nos données. Pour cela, nous découvrirons AWS Glue et ses nombreuses fonctionnalités pour le traitement mais aussi pour l’exposition de données et referons encore un passage par IAM.

AWS Glue

AWS Glue est un service d’extraction, de transformation et de chargement (ETL) entièrement géré.
Il vous permet, un peu à la manière de Lambda, d’exécuter des jobs ETL sans vous soucier de l’infrastructure. Glue s’appuie sur Spark et vous permet donc de bénéficier de nombreuses fonctions de transformations et de la parallélisation, très utiles pour vos jobs de traitement de données. Vous pouvez déployer vos jobs Spark (Scala ou Python), ou laisser Glue les générer pour vous.
Glue propose également ce qu’on appelle des crawlers. Ce sont des analyseurs qui vont déduire le schéma, le format et les types de vos données. Ces métadonnées sont stockées en tant que tables dans le Data Catalog Glue que vous pourrez réutiliser dans vos jobs Glue, via des services de requête, etc.

1. Rendez-vous sur la page du service Glue.

2. Cliquez sur Jobs, puis Add job. Nommez le job transform_to_parquet.

3. Il nous faut choisir un rôle pour notre job. Nous allons le créer et lui attribuer les droits sur les deux buckets techniques créés par AWS pour y stocker notre script et des fichiers temporaires ainsi que notre bucket datalake.
Rendez-vous sur la page du service IAM puis cliquez sur Roles et Create Role.

4. Cette fois, choisissez Glue comme Use Case, puis Next: Permissions.

5. Puis Create Policy.

Et insérez-y la policy suivante, en pensant à remplacer par le nom de votre bucket de datalake, votre AccountID ainsi que le RegionID où cela est mentionné.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ListObjectsInBucket",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::",
"arn:aws:s3:::aws-glue-scripts--",
"arn:aws:s3:::aws-glue-scripts--"
]
},
{
"Sid": "AllObjectActions",
"Effect": "Allow",
"Action": "s3:*Object",
"Resource": [
"arn:aws:s3:::/*",
"arn:aws:s3:::aws-glue-scripts--/*",
"arn:aws:s3:::aws-glue-temporary--/*"
]
}
]
}

6. Nommez, décrivez et créez votre policy.

7. Puis retournez à l’étape de création de votre rôle, et sélectionnez la. Ajoutez y également AWSGlueServiceRole.

8. Vous pouvez tagger votre rôle. A la dernière étape, nommez et décrivez votre rôle puis créez le.

9. Retournez à l’étape de création de votre job Glue et choisissez le rôle fraîchement créé.

10. Dans Glue Version, sélectionnez Spark 2.4, Scala 2 with improved job startup times (Glue Version 2.0).

11. Pour This job runs, cochez A new script to be authored by you.

12. TransformToParquet sera le nom de notre classe.

13. Les éléments suivants sont pré-complétés par le service Glue. Il s’agit de buckets techniques dans lesquels seront stockés nos scripts et les fichiers temporaires nécessaires au fonctionnement interne de Glue. Ce sont sur ces buckets que nous avons donné les accès à notre Role précédemment.

14. Déroulez le sous-menu Monitoring options, cochez Job metrics.

15. Vous pouvez là aussi tagger le job dans le sous-menu Tags.

16. Puis passez à Security configuration, script libraries, and job parameters. Sélectionnez le Worker type Standard et Number of workers à 2, ce sera amplement suffisant pour notre job.
Ces paramètres doivent vous être familiers si vous travaillez ou avez travaillé sur une architecture distribuée et permettent d’assigner les ressources à disposition de notre job. Pour plus d’informations, n’hésitez pas à lire la très bonne documentation d’AWS.

17. Cliquez enfin sur Suivant puis Save job and edit script.

18. Copiez le code suivant dans l’éditeur, en pensant à remplacer par le nom de votre bucket de datalake où cela est mentionné.

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._

object TransformToParquet {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession

val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "table").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)

val datalake = ""
val table = args("table")

val inputData = sparkSession.read
.format("csv")
.option("header", "true")
.load(s"s3://$datalake/raw/$table/*.csv")

val output = inputData.write
.mode(SaveMode.Overwrite)
.parquet(s"s3://$datalake/parquet/$table/")
Job.commit()
}
}

19. Vous pouvez cliquer sur Save et quitter l’interface de création du job.

20. Testons notre job fraîchement créé. Nous avons un fichier de données candidats stocké sur notre datalake, sous le préfixe raw/, au format csv que nous désirons convertir au format parquet, dans ce même datalake, sous le préfixe parquet/.
Sélectionnez le job transform_to_parquet puis Action et enfin Run job.

21. Déroulez le sous-menu Security configuration, script libraries, and job parameters. En bas de page, dans Job parameters ajoutez — table (notez que le paramètre attendu par notre script est bien table et non — table) et la valeur candidats. Ce sont les arguments passés à notre job Glue.

22. Lancez le job en cliquant sur Run job. Vous pouvez suivre l’exécution du job en cliquant dessus.

23. Une fois terminé, vous devriez retrouver vos données candidats au format parquet dans le datalake, à l’endroit attendu dans notre script.

Passons maintenant au crawler. Celui-ci analysera pour nous le contenu de nos données, en déduira le type des données, les structures, partitions et créera ainsi les tables dans notre Catalog Glue.

1. Rendez-vous dans la menu Crawlers du service Glue, cliquez sur Add crawler.

2. Nommez-le. Taggez-le et décrivez-le si vous le souhaitez. Puis Suivant.

3. Choisissez Data stores. Puis Suivant.

4. Dans Include path, sélectionnez via l’icône Dossier ou saisissez le chemin vers nos données parquet : s3://<YourDatalakeBucket>/parquet. Cela constitue le répertoire, le préfixe à analyser par notre crawler. Puis Suivant.

5. Non. Puis Suivant.

6. Vous pouvez laisser AWS créer le rôle associé au crawler pour vous. Le service aura l’intelligence de lui assigner la policy strictement nécessaire à son fonctionnement, c’est à dire AWSGlueServiceRole mais surtout, l’accès aux seules données stockées sous s3://<YourDatalakeBucket>/parquet. Suffixez le simplement, par parquet-datalake par exemple, pour mieux le retrouver. Puis Suivant.

7. Laissez le déclenchement de notre crawler à Run on demand. Puis Suivant.

8. Nous n’avons pas de base de donnée pour le moment. Créez la et nommez-la datalake. C’est dans cette base de données que nous retrouverons les tables déduites par notre crawler. Puis Suivant.

9. Vous avez, à la dernière étape, un résumé du crawler créé. Terminez.

Les composantes de notre pipeline sont prêtes, ils nous reste cependant à les orchestrer. C’est ce que nous ferons dans la prochaine partie avec le service Step Function, qui s’appuie sur une logique de machine à états très pratique pour notre besoin.

--

--