Big Water : le data engineering au service de la gestion des eaux

L’équipe Data de Neoxia est intervenue afin d’apporter son expertise en data engineering au Conseil départemental de la Seine-Saint-Denis pour le traitement des mesures hydrauliques.

Ousmane Diop
neoxia
6 min readJul 22, 2020

--

En 2016, Neoxia avait déjà mis sur pied un POC (proof of concept) afin d’illustrer auprès du Conseil départemental de la Seine-Saint-Denis la capacité d’une architecture technique innovante à traiter des volumes de mesures hydrauliques extrêmement importants dans des délais satisfaisants.

Le projet, baptisé Big Water, est une solution SaaS de gestion de l’eau pour le département du 93, offrant de puissants outils de calcul et de visualisation des données. Ces données sont issues de capteurs et correspondent à des mesures de caractéristiques physiques de l’eau (hauteur, vitesse, débit, turbidité, lame d’eau pluviométrique). La plateforme Big Water permet l’import et la validation de mesures à travers des règles paramétrables, la correction des mesures invalides grâce à des formules, une visualisation graphique et détaillée des mesures, et un accès instantané aux données historiques.

L’objectif de l’application étant de pouvoir traiter des milliards de mesures, l’équipe Data de Neoxia est ré-intervenue en 2020 afin d’apporter son expertise en data engineering. Dans cet article, nous expliquerons comment Apache Spark nous a permis de répondre à des défis techniques pour ce projet ambitieux.

Migration de données

La migration de données a consisté à transférer des mesures issues de capteurs hydrauliques et des métadonnées associées depuis une base de données Oracle vers la base Cassandra de Big Water. Une base de données Cassandra permet de gérer une grande quantité de données en offrant à la fois une scalabilité et une disponibilité sans compromis au niveau de la performance.

L’implémentation de la migration a consisté à mettre en place un job ETL (Extract-Transform-Load) développé sous Spark et distribué dans un cluster Google Cloud Dataproc.

D’abord, les données de la base Oracle ont été extraites sous forme de fichiers parquet. La lecture des fichiers parquet est faite via une connexion JDBC ; Spark assure cette connexion grâce aux méthodes d’interface pour effectuer des opérations spécifiques fournies par les classes Spark pyspark.sql.DataFrameReader et pyspark.sql.DataFrameWriter. Pour optimiser cette tâche, Spark donne la possibilité de paralléliser la lecture des données en créant des connexions JDBC concurrentes.

Ensuite arrive l’étape de préparation, de croisement des données et de création d’une table finale contenant les mesures fournies par les capteurs, auxquelles on associe un système de labellisation de plusieurs niveaux. Ces labels permettent d’identifier des périodes cohérentes représentées par une suite de mesures ayant des éléments communs, et sont définis à partir de règles métier. Ces règles sont conditionnelles, et font intervenir plusieurs tables de la base Oracle. Ainsi, des jointures de différents types sont effectuées. Pour assurer ces jointures, un partitionnement des tables est indispensable pour que les données avec la même clé soient traitées par le même exécuteur du cluster Spark. Les partitionneurs, dans notre cas, sont l’année, le mois et le jour de la date de prise de mesure.

La dernière étape consiste à filtrer la table finale en ne gardant que les colonnes souhaitées. Le chargement de la table finale dans Cassandra peut se faire directement via la classe Spark pyspark.sql.DataFrameWriter. Mais nous avons opté pour un export vers des fichiers CSV stockés dans Google Cloud Storage qui sont ensuite télédéposés dans Cassandra. Cela évite de relancer inutilement le job ETL pour les environnements de recette, pré-prod et prod. Si le client est satisfait de la recette, nous pouvons télédéposer directement les données en production.

Ce traitement distribué des données doit être exécuté sur un cluster de machines ; d’où l’utilisation du service Dataproc de Google Cloud Platform. Ce service permet l’activation en 90 secondes d’un cluster personnalisé et entièrement géré. De plus, ses fonctionnalités d’autoscaling et de suppression des clusters inactifs permettent d’assurer une maîtrise du coût total du job ETL Spark.

Validation automatique

La validation est une étape importante, mais répétitive, dans le traitement des données hydrauliques. Elle consiste à vérifier la qualité et la cohérence des mesures fournies par les capteurs. Pour Big Water, nous avons utilisé Spark pour mettre en place un service de validation automatique de ces mesures. Ce service consiste d’abord à une vérification de la qualité des données grâce à des règles métier paramétrables ; puis, à une correction des données avec des formules fournies par l’utilisateur. Il s’agit d’une application Spark exposée via une API grâce à Spark Jobserver.

La validation automatique des mesures augmente la productivité des hydrologues, leur permettant de se focaliser sur les tâches à forte valeur ajoutée comme l’analyse des mesures, plutôt que des tâches répétitives de validation manuelle. Cette architecture assure également une capacité de traitement d’une quantité de données importante en un temps satisfaisant.
Ce processus de validation automatique correspond également à un ETL Spark dont les transformations sont cette fois-ci définies par l’utilisateur.

Les données sont extraites de la base Cassandra et stockées dans des RDDs (Resilient Distributed Datasets). Un RDD est une structure de données tolérante aux pannes, adaptée aux calculs distribués et permettant de réutiliser les données de façon optimale lors d’un processus itératif. Il s’agit de l’abstraction de données proposée par Spark afin de pallier les limitations d’Hadoop liées aux algorithmes itératifs. L’étape de transformation de l’ETL est la plus cruciale ; ainsi, elle s’effectue en deux temps.

On débute la première phase en récupérant d’abord les règles de validation définies par l’utilisateur grâce à un appel API. Les règles peuvent par exemple être que la mesure soit comprise entre un seuil minimum et maximum, ou à ce que la variabilité d’une mesure par rapport aux précédentes ne dépasse pas une certaine valeur.

Ensuite, les RDDs contenant les mesures sont partitionnés et distribués à travers les exécuteurs (workers). Ainsi, la vérification peut s’effectuer en parallèle. À la fin de cette phase, les mesures vérifiant les conditions sont labellisées comme étant valides, celles qui sont invalides sont corrigées dans la deuxième phase de l’étape de transformation.

L’environnement d’exécution de la seconde phase est identique à la première, c’est-à-dire que les RDDs sont partitionnés et distribués entre les exécuteurs. Comme pour les règles, c’est l’utilisateur qui définit également les formules à appliquer pour corriger les mesures. Les formules sont pré-compilées avec la librairie Janino afin d’effectuer une vérification syntaxique. Les variables sont entre autres la mesure brute à corriger, le seuil maximum ou minimum pour une période donnée, l’épaisseur du radier de la canalisation… La complexité des formules varie d’une combinaison simple de fonctions mathématiques basiques, aux instructions conditionnelles avec un appel à des fonctions abaques.

Pour finir, les données sont réécrites dans la base Cassandra. Un système de labellisation permet de différencier les mesures corrigées par rapport à celles qui ne le sont pas.

En somme, nous avons d’une part une application Spark qui exécute une seule fois un job ETL afin de migrer des données depuis Oracle vers Cassandra dont le cluster de machines est managé par Google Cloud Dataproc. D’autre part, un service de validation automatique dont chaque requête de validation correspond à un job ETL lancé via des appels API à Jobserver. Dans les deux cas, l’utilisation du framework Spark offre à la fois un environnement propice aux calculs distribués et une scalabilité horizontale avec son architecture driver-workers nous permettant ainsi de traiter une grande quantité de données en un temps record.

Ces deux projets ont permis au Conseil départemental du 93, accompagné par Neoxia, d’assurer un renforcement de la stabilité et de la qualité des données pour la gestion des eaux du département.

Article écrit par Ousmane Diop et Chayma Mesbahi.

--

--