Faire un export JSON valid depuis BigQuery grâce à Apache Beam et Dataflow

Vincent Villet
Publicis Sapient France
11 min readFeb 27, 2023

BigQuery est le data warehouse serverless de Google Cloud Platform qui permet d’analyser de très larges volumes de données en un minimum de temps. Ses hautes performances limitent néanmoins ses options d’export de données, ce qui peut être problématique si le système consommateur des données d’export a besoin d’un formatage bien précis. J’ai été confronté à cette contrainte en mission et je l’ai résolue par l’utilisation d’Apache Beam. En effet, Apache Beam permet de se connecter directement à BigQuery et de traiter les données de façon personnalisée et à la volée avant leur export. Il est de plus très facile de faire tourner des pipelines Apache Beam dans Google Cloud Platform en utilisant le service Dataflow. Cet article introduit les bases d’Apache Beam à travers l’exemple de la construction d’un pipeline Dataflow d’export JSON valid depuis BigQuery, qui correspond au cas que j’ai rencontré.

Les limites de l’export JSON natif de BigQuery

L’objet de cet article n’est pas de rentrer dans les détails des spécifications du format JSON, mais de comprendre les limitations de l’export natif de BigQuery et comment un pipeline Beam très simple permet de les résoudre. Les données utilisées dans cet article proviennent de la table publique bigquery-public-data:london_bicycles.cycle_hire qui contient les informations de location de l’équivalent londonien du Vélib’:

Data set de locations de vélos à Londres

Cette table fait 2,59 Go et la commande qui permet de faire son extract JSON est la suivante:

bq extract --destination_format NEWLINE_DELIMITED_JSON bigquery-public-data:london_bicycles.cycle_hire 'gs://beam-bq-json-export/export_*.json'

(le bucket Cloud Storage que j’ai utilisé s’appelle beam-bq-json-export, remplacez-le par le vôtre si vous voulez reproduire les résultats)

Cette commande comporte 2 spécificités qui peuvent devenir des inconvénients en fonction des contraintes du système dans lequel vous voulez injecter les données:

  1. Il n’est pas possible de définir la taille des fichiers de sortie, qui sont par défaut assez volumineux:
Taille des fichiers générés par l’export json Big Query

2. Les fichiers de sortie n’ont ni crochets en début et fin de fichier, ni virgule entre les lignes car il s’agit de JSON “newline delimited”:

Elements manquants dans l’export json BigQuery classique

Introduction à Apache Beam

Apache Beam est un modèle de programmation unifié pour créer des pipelines de traitement distribué de données en batch ou en streaming. Le même code peut-être exécuté par des runners différents comme par exemple Spark ou Dataflow. Un programme Beam consiste en la création d’un Pipeline qui prend en entrée un ou plusieurs datasets distribués appelés PCollections et leur applique des transformations appelées PTransforms.

Illustrons ces concepts avec une version revisitée de l’exemple WordCount de la documentation d’Apache Beam, que vous pourrez trouver dans le repository de cet article à examples/wordcount.py.

Le programme prend en entrée le texte du premier livre des Misérables de Victor Hugo et sort un fichier de comptage de chacun des mots:

Illustration du comptage des mots des Misérable de Victor Hugo

La syntaxe de création du pipeline et de définition des PTransforms est assez intuitive:

  • les différentes étapes du pipeline sont liées les unes aux autres par la surcharge de l’opérateur |.
  • les >> servent à associer des noms aux étapes.

Les fonctions Map et FlatMap permettent toutes les 2 d’appliquer des transformations sur les éléments de la PCollection d’entrée et diffèrent simplement sur le nombre d’élément de sortie:

  • Map créera toujours un unique élément de sortie par élément d’entrée.
  • FlatMap aplatit les résultats pour potentiellement créer plusieurs éléments de sortie par élément d’entrée.
 with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

def format_result(word_count):
(word, count) = word_count
return '%s: %s' % (word, count)

(p
# Chaque ligne du texte devient un élément dans la PCollection.
| 'ReadText' >> ReadFromText(known_args.input)
# La fonction FlatMap permet d’associer plusieurs éléments de sortie pour un élément d’entrée.
# Elle permet ici d’extraire chaque mot de chaque ligne en tant qu’élément.
# exemple: ["Il fait beau"] ==> [["Il"], ["fait"], ["beau"]].
| 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
# La fonction Map associe à chaque élément un unique élément.
# Elle sert ici à associer un compteur de 1 à chaque mot.
# exemple: ["beau"] ==> [("beau", 1)]
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
# Comptage des occurrences des mots.
| 'GroupAndSum' >> beam.CombinePerKey(sum)
| 'Format' >> beam.Map(format_result)
| 'WriteText' >> WriteToText(known_args.output)
)

(tous les détails des imports sont dans le fichier complet hébergé sur GitHub)

Ligne 13, le FlatMap de l’opération "Split" démultiplie le nombre d’éléments de la PCollection en scindant les phrases en mots. La PCollection a alors autant d’éléments que de mots.

Ligne 19, le CombinePerKey réduit le nombre d’éléments au nombre de mots uniques, chaque mot étant assigné dans un tuple à son nombre d’occurrences (équivalent d’un SELECT word, COUNT(*) as count GROUP BY word en SQL).

Map et FlatMap sont en fait des cas spéciaux de la fonction générique ParDo qui permet d’exécuter des transformations sur les éléments d’une PCollection et de créer, 0, 1 ou plusieurs éléments dans la PCollection de sortie par élément de la PCollection d’entrée. Par défaut les sorties de ParDo sont considérées comme des collections d’éléments donc les chaînes de caractères vont typiquement être splittées en leurs caractères individuels comme dans l’exemple suivant qui cherche à extraire la première colonne d’un fichier csv:

fichier d’entrée dates.csv:

date,valeur
2017-04-03,2
2017-04-03,2
2017-04-10,2
  with beam.Pipeline(options=pipeline_options) as p:

class DateExtractor(beam.DoFn):
def process(self, data_item):
return (str(data_item).split(','))[0]

(p
| 'ReadMyFile' >> ReadFromText('./data/dates.csv')
| 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
| 'Output' >> WriteToText(known_args.output + "_pardo"))

Début du fichier de sortie du pipeline:

d
a
t
e
2
0
1
7
-
0
4
-
0
3
2
0
1
7

Pour avoir le comportement attendu il faudrait encapsuler dans une liste l’output de la fonction DateExtractor. À noter que remplacer ParDo par Map à la ligne 9 permet d’avoir directement le résultat attendu:

date
2017–04–03
2017–04–03
2017–04–10

Le script run_pardomap_example.sh à la racine du repository permet de faire tourner le code de cet exemple.

Faire un export JSON valid de BigQuery grâce à Beam

Revenons à notre problème initial. Pour avoir un contrôle complet sur nos fichiers de sortie nous allons rajouter une colonne file_name contenant le chemin du fichier au sein du bucket Cloud Storage d’export directement dans le dataset BigQuery à exporter. Dans le cas de notre export des données de location de vélo à Londres, nous voulons que les exports JSON du jour soient rangés dans un sous-dossier portant le nom de la date du jour et qu’ils soient répartis par id de station de départ dans la limite de 15000 lignes par station. Le format du fichier de sortie sera donc <date>/<start_station_id>_<file_number>. Par exemple, si la station de départ 351 a 30000 locations le 11 janvier 2021 alors 15000 locations se retrouveront dans le fichier 2021-01-11/351_000 et les 15000 autres dans le fichier 2021-01-11/351_001 (L’utilisation du séparateur / dans le nom du fichier permettra à Cloud Storage de simuler une arborescence de dossiers avec notamment dans notre cas un sous-dossier portant la date du jour).

Dataset de location de vélo avec colonne file_name

Voici le code SQL BigQuery qui permet de créer la variable file_name:

CONCAT(
DATE(CURRENT_TIMESTAMP),"/", start_station_id,
FORMAT("_%03d",
CAST(
ROW_NUMBER() OVER(PARTITION BY start_station_id ORDER BY start_date)/15000
AS INT64
)
)
)

Ces préparatifs réglés, place au code Beam! La logique est très simple (code complet sur GitHub):

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

(p
| 'ReadTable' >> beam.io.gcp.bigquery.ReadFromBigQuery(table=known_args.input)
| "SetKeys" >> beam.Map(lambda s: set_keys(s))
| "Grouping keys" >> beam.GroupByKey()
| 'Windowed Writes' >> beam.ParDo(WindowedWritesFn(known_args.output)))

Le pipeline lit directement la table grâce au connecteur beam.io.gcp.bigquery.ReadFromBigQuery, regroupe les données par file_name avec la fonction set_keys (détaillée ci-dessous) et applique un ParDo personnalisé sur les données groupées pour mettre en forme les données comme on le souhaite et écrire les données dans le bucket Cloud Storage indiqué et au chemin correspondant au file_name.

La fonction set_keys prend en entrée le dictionnaire issu de la table BigQuery correspondant à une ligne et lui associe le tuple (file_name, json avec toutes ses valeurs sauf file_name):

def dict_without_keys(d, forbidden_keys):
return {x: d[x] for x in d if x not in forbidden_keys}

def set_keys(row):
"""
ex:
>>> row = {"start_station_id": 351,
"end_station_id": 340,
"start_date": "2021-01-11",
"file_name"="2021-01-11/351_000"}

>>> set_keys(row)
("2021-01-11/351_000", '{"start_station_id": 351, "end_station_id": 340, "start_date": "2021-01-11"}')
"""
return (row["file_name"], str(dict_without_keys(row, ["file_name"])))

La classe WindowedWritesFn hérite quant à elle de la classe Beam.DoFn pour pouvoir être distribuée via ParDo et overwrite la fonction process avec les opérations que l’on souhaite faire. Elle est très fortement inspirée de ce post Stack Overflow portant sur l’écriture d’un fichier par fenêtre Pub/Sub pour ce qui est de la création et de l’utilisation du writer.

class WindowedWritesFn(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir


def process(self, element):
# Import inside DoFn not to have to use --save-main-session True in pipeline args
# See https://cloud.google.com/dataflow/docs/guides/common-errors#name-error for more info
from apache_beam.io.filesystems import FileSystems

(file_name, rows) = element
rows = list(rows)

with FileSystems.create(self.outdir + "/{}.json".format(file_name), mime_type="text/plain") as writer:
file_content = "[" + ",\n".join(rows) + "]"
writer.write(file_content.encode("utf-8"))

La fonction récupère le tuple qui contient les données et le file_name, crée le writer à partir du file_name et de self.outdir qui est le lien du bucket Cloud Storage d’export, et implémente la logique des crochets et des virgules pour écrire le fichier de sortie.

La requête d’exécution est la suivante (cf fichier main.sh dans le repository):


python ./beam_bq_json_export.py --project="${PROJECT_ID}" --region=europe-west1 --runner=DirectRunner \
--input="${PROJECT_ID}:${EXPORT_DATASET}.${EXPORT_TABLE}" \
--staging_location=gs://$EXPORT_BUCKET/staging --temp_location gs://$EXPORT_BUCKET/tmp \
--output=gs://$EXPORT_BUCKET

Les arguments staging_location et temp_location sont des emplacements nécessaires au pipeline pour la préparation des données. Le runner utilisé est ici le DirectRunner, ce qui signifie que les données seront traitées en local, ce qui prendra autour d’une heure ou deux en fonction de la puissance de votre poste. Les autorisations requises sont simplement celles nécessaires pour manipuler les ressources BigQuery et Cloud Storage et elles seront automatiquement utilisées par le script après l’exécution de la commande gcloud auth application-default login avec un user les possédant.

Exécuter le pipeline avec Dataflow

L’exécution du pipeline avec Dataflow nécessite 3 étapes supplémentaires:

  1. passer Dataflow à l’argument runner de la ligne de commande du script python,
  2. activer l’API Dataflow si elle n’est pas déjà activée, et
  3. gérer les autorisations.

D’une manière générale les erreurs sont très explicites et redirigent directement vers les activations ou autorisations manquantes:

Erreur GCP renvoyant à l’activation de l’API Dataflow

Pour bien comprendre la gestion des autorisations lors du lancement d’un pipeline Dataflow, il faut faire la distinction entre 3 comptes:

  • le compte de service Dataflow,
  • le compte de service du contrôleur, et
  • le compte qui exécute le code de création du pipeline.

Le compte de service Dataflow est un compte de type serviceAgent géré par Google. Tous les services managés ont un compte serviceAgent associé et il est extrêmement rare d’avoir à les manipuler donc je n’en parlerai pas plus dans cet article. Je le mentionne uniquement parce qu’il apparaît dans la page de documentation de Google sur la sécurité de Dataflow.

Le compte de service du contrôleur est utilisé par Dataflow pour exécuter les tâches du pipeline. Il est par défaut égal au compte de service Compute Engine mais la documentation nous met en garde: «Le compte de service Compute Engine par défaut dispose d’un accès étendu aux ressources de votre projet, ce qui facilite la mise en route de Dataflow. Toutefois, pour les charges de travail de production, nous vous recommandons de créer un nouveau compte de service ne disposant que des rôles et autorisations nécessaires». Le compte de service Compute Engine est en effet par défaut Éditeur sur le projet. Nous allons donc créer un compte de service dédié et le passer au script beam_bq_json_export.py via le paramètre --service_account_email. Ce compte de service devra être Nœud de calcul Dataflow (roles/dataflow.worker) et posséder les autorisations nécessaires à l’exécution des tâches (utilisation de BigQuery et accès à GCS dans notre cas).

Enfin, le compte exécutant le code de création du pipeline peut être soit le compte utilisateur configuré par la commande gcloud auth application-defaut login, soit un compte de service dont le fichier json contenant la clef sera référencé par la variable d’environnement GOOGLE_APPLICATION_CREDENTIALS (à laisser vide si vous avez utilisé gcloud auth application-defaut login). Ce compte doit au minimum être Administrateur Dataflow (roles/dataflow.admin) et Utilisateur du compte de service (iam.serviceAccounts.actAs). À noter que si l’on utilise le même compte de service pour l’exécution du code de création du pipeline et pour l’exécution des tâches du pipeline alors ce compte sera à la fois dataflow.admin et dataflow.worker.

Dans cet exemple j’ai décidé d’utiliser gcloud auth application-defaut login pour utiliser mon compte propriétaire du projet lors du lancement de la pipeline.

La création d’un compte de service pour l’exécution des tâches se fait très simplement dans API & Services / Identifiants / Créer des identifiants / Compte de service:

Interface de création de compte de service

Pour ce qui est de la mise à jour des autorisations on se tourne bien sûr vers l’interface IAM et on donne les autorisations suivantes au compte de service:

  • roles/bigquery.dataViewer,
  • roles/bigquery.jobUser pour avoir l’autorisation bigquery.jobs.create,
  • roles/storage.objectAdmin pour lire et écrire dans Cloud Storage, et
  • roles/dataflow.worker.
Rôles du compte de service Dataflow

On peut alors lancer le pipeline Dataflow avec la commande

python ./beam_bq_json_export.py --project="${PROJECT_ID}" --region=europe-west1 --runner=DataFlow \
--input="${PROJECT_ID}:${EXPORT_DATASET}.${EXPORT_TABLE}" \
--staging_location=gs://$EXPORT_BUCKET/staging --temp_location gs://$EXPORT_BUCKET/tmp \
--output=gs://$EXPORT_BUCKET
--service_account_email=beam-bq-json-export@beam-bq-json-export.iam.gserviceaccount.com

Félicitations, vous venez de faire tourner votre premier pipeline Dataflow ! L’interface Dataflow permet de visualiser les tâches sous forme de graphique. L’exécution du pipeline a ici pris un gros quart d’heure ce qui est nettement moins qu’en local.

Graphe du run Dataflow

Conclusion

Comme cet exemple vous l’aura montré, la programmation d’un pipeline Beam basique est très simple. Le système des runners permet de plus d’exécuter le pipeline sur Dataflow sans aucune complexité additionnelle. Pour notre cas d’usage d’export de données BigQuery au format JSON, l’utilisation de Beam s’avère nettement plus simple que de retraiter les fichiers issus de l’export BigQuery par défaut. Le coût est en revanche plus élevé car on a exécuté une requête SQL supplémentaire ainsi qu’un batch Dataflow, alors que l’export BigQuery est gratuit. À noter qu’il est également possible de faire des exports Parquet BigQuery. A vous d’adapter la sortie de vos traitements en fonction de vos besoins via ce qui est déjà disponible ou créant votre propre sortie. Vous pourrez pour cela vous inspirer du code source de l’article disponible sur GitHub, en espérant qu’il vous sera utile !

--

--

Vincent Villet
Publicis Sapient France

Machine Learning Engineer passionné par la minimisation de l’impact environnemental du numérique et son usage pour lutter contre le réchauffement climatique.