Streaming de données avec InfluxDB, Kafka et Grafana : une approche pratique

Thibaut Furnon
neoxia
Published in
7 min readFeb 17, 2022

Qu’il s’agisse d’estimer la valeur du Bitcoin dans les prochains mois ou de prédire le nombre de recherches Google ayant pour mots clé “Lionel Messi” + “PSG”, l’étude des séries temporelles est une des problématiques les plus actuelles dans le domaine de la Data Science.

Les modèles de prédiction nécessitent une quantité massive de données qu’il est difficile d’administrer sans l’utilisation d’outils spécifiques, permettant un traitement rapide, que ce soit pour l’ingestion, la transformation ou encore la visualisation de ces données.

Nous présenterons dans cet article InfluxDB, un outil dédié à l’ingestion et au stockage de données temporelles. Nous utiliserons Apache Kafka pour la gestion de nos flux de données et Grafana, logiciel de référence pour la construction de tableaux de bord.

Cet article présentera une illustration de ces trois technologies à l’aide d’une étude de cas s’appuyant sur les données des bornes cyclables JCDecaux. Tous en selle, c’est parti !

Pourquoi utiliser les bases de données de séries temporelles ?

Il existe deux raisons pour lesquelles ces bases de données gagnent en popularité ces dernières années (source).

Tout d’abord, les données temporelles ont souvent un flux d’accumulation important, les bases de données traditionnelles n’ont pas été construites pour gérer ce genre de données. Dans notre contexte de données chronologiques, les bases de données NoSQL fonctionnent mieux que les bases de données relationnelles de manière générale. Les bases de données de séries temporelles ont été pensées de sorte à traiter la donnée de manière rapide et efficace, pouvoir se mettre à l’échelle en fonction des évolutions en taille des jeux de données et permettre une ingestion performante de nouvelles données.

Les bases de données de séries temporelles ont également un avantage : leur prise en main. Elles contiennent des fonctions prédéfinies et des opérations spécifiques aux séries temporelles. Il peut s’agir de requêtes, d’agrégation sur les données, de politiques de rétention des données…. Cela permet aux utilisateurs d’accélérer le processus d’analyse et d’utilisation des données et offre une meilleure expérience utilisateur.

Le système de gestion de bases de données pour les séries temporelles le plus largement utilisé est InfluxDB.

Source : https://db-engines.com/en/ranking_trend/time+series+dbms

Pourquoi InfluxDB est-il votre meilleur outil pour l’analyse des séries temporelles ? Pour donner quelques chiffres :

  • InfluxDB requiert 20 fois moins d’espace de stockage que MongoDB
  • InfluxDB est 5 fois plus rapide pour insérer des données que Cassandra
  • InfluxDB est 8 fois plus rapide que Elasticsearch pour lire des données

Afin d’optimiser son utilisation pour les séries temporelles, InfluxDB a les propriétés suivantes :

  • Les données sont ordonnées temporellement de manière croissante (pour chaque instant il ne peut y avoir qu’une donnée par champ)
  • Des restrictions sont faites concernant la mise à jour ou la suppression de données
  • La priorité est donnée aux requêtes de lecture et d’écriture

InfluxDB utilise deux principaux langages de programmation : InfluxQL (semblable à SQL mais sans les opérations UNION, JOIN ou HAVING) et Flux (plus récent et complet). InfluxDB a récemment sorti une nouvelle version 2.0 intégrant de nombreuses nouvelles fonctionnalités, notamment une interface intégrée permettant aussi la création facile de graphes. Toutefois, cette version comporte encore quelques problèmes, par exemple pour l’ingestion de larges datasets CSV. Nous avons donc décidé d’utiliser la version 1.8, qui est plus stable et déjà très complète, pour notre cas pratique que nous allons maintenant détailler.

Besoin d’un vélo près de chez vous ? Surveillez les disponibilités en quelques clics !

Pour mieux comprendre l’intérêt d’InfluxDB dans l’étude des séries temporelles, nous proposons de suivre un exemple s’appuyant sur les données d’utilisation de vélo en libre service JCDecaux. La suite du présent article détaillera donc cet exemple qui permet d’avoir un cas concret pour illustrer et prendre en main les outils que nous allons utiliser. L’architecture que nous avons retenue est la suivante :

Tout d’abord pour la configuration de l’environnement, il convient d’utiliser Docker. Ce Docker-Compose comporte notamment InfluxDB, Kafka, Zookeeper et Grafana. Le ‘docker-compose.yaml’ est le suivant :

version: '3'
services:
influxdb:
image: influxdb:1.8.5
volumes:
- ./influxdb/conf:/etc/influxdb
ports:
- 8086:8086
grafana:
image: grafana/grafana:latest
ports:
- 3000:3000
volumes:
- ./grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/provisioning/datasources:/etc/grafana/provisioning/datasources
depends_on:
- influxdb
zookeeper:
image: bitnami/zookeeper
user: root
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka
user: root
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper

Pourquoi utiliser Kafka dans ce contexte

Apache Kafka est une plateforme logicielle open-source de traitement de flux de données en temps réel. Il permet donc de construire des pipelines de données sophistiqués. Kafka réunit les trois éléments suivant :

  • Message Queue : lecture et écriture des flux de données
  • Message Broker : traitement des message en temps réel
  • Data Store : découplage de la publication et de la consommation des messages (par respectivement le Producer et le Consumer)

Dans notre exemple, Kafka sert à alléger le flux de données entrant dans InfluxDB, en prenant le rôle de tampon, le système devient ainsi résistant aux pannes.

Le schéma ci-dessous résume le fonctionnement de Kafka et comment celui-ci se combine avec InfluxDB.

Les Producers sont les applications clientes qui publient des événements sur Kafka, les Consumers sont ceux qui s’abonnent à ces événements.

Consumer & Producer (InfluxDB)

Pour écrire ses données dans le temps, InfluxDB utilise le protocole de ligne : chaque point représente une donnée à laquelle est associée une mesure, un tag et un champ. Le champ représente le type de donnée qui est mesurée. Le tag est une valeur optionnelle permettant d’avoir plus de détails sur la donnée. Dans notre exemple, la mesure serait ‘jcdecaux’ ce qu’on pourrait interpréter comme le nom d’une table dans une base de données relationnelle classique.

Voici à quoi ressembleraient les données :

Dans le cas de notre exemple, grâce à l’API de JCDecaux nous pouvons récupérer différentes données comme le nombre de vélos disponibles par stations, le nombre de places disponibles par stations, etc… Tout d’abord, les données sont envoyé à Kafka à l’aide du Producer :

import time
from kafka import KafkaProducer
import json
import urllib.request

TOPIC_NAME = "velib-stations"
BOOSTRAP_ENDPOINT = "localhost:9092"
url = "https://api.jcdecaux.com/vls/v1/stations?apiKey={}".format(API_KEY)

producer = KafkaProducer(bootstrap_servers=BOOSTRAP_ENDPOINT)

while True:
resp = urllib.request.urlopen(url)
stations = json.loads(resp.read().decode())
print(len(stations))
for station in stations:
producer.send(TOPIC_NAME, json.dumps(station).encode())
print("Produced {} station records".format(len(stations)))
time.sleep(300)

A l’aide du Consumer Kafka et d’un formatage adéquat des données, celles-ci peuvent être envoyées vers InfluxDB afin d’être stockées.

from kafka import KafkaConsumer
import json
from nx_timeseries.databases.influxdb import InfluxDB, Formatter

# InfluxDB config
DATABASE = "bike"
HOST = "localhost"
PORT = "8086"
USERNAME = "root"
PASSWORD = "root"

client = InfluxDB(HOST, PORT, USERNAME, PASSWORD, DATABASE)
client.switch_user(USERNAME, PASSWORD)

# Kafka config
BOOTSTRAP_ENDPOINT = "localhost:9092"
GROUP_ID = "group-1"
RECEIVED_TOPIC_NAME = "velib-stations"

consumer = KafkaConsumer(
RECEIVED_TOPIC_NAME,
bootstrap_servers=BOOTSTRAP_ENDPOINT,
group_id=GROUP_ID,
)

# Data format for InfluxDB
measurement = "jcdecaux"

tagcolumns = [
"number",
"contract_name",
"name",
"address",
"banking",
"bonus",
"status",
]

fieldcolumns = [
"bike_stands",
"available_bike_stands",
"available_bikes",
]

# Create formatter
formatter = Formatter(
measurement, tagcolumns, fieldcolumns, timecolumn="last_update"
)

# Check if database bike already exists
create = True
for db in client.get_list_database():
if db.get("name") == DATABASE:
create = False
break
if create:
client.create_database(DATABASE)

nb = 0
for message in consumer:
msg = json.loads(message.value)
datapoints = []

# Format points
point = formatter.format_point(msg)
datapoints.append(point)

response = client.write_points(datapoints, time_precision="ms")
if not response:
print("Problem inserting points, exiting...")
exit(1)
nb += 1
print("Messages written :", nb)

C’est le moment d’utiliser Grafana pour la visualisation et l’analyse

Une fois les données disponibles dans InfluxDB, il nous est possible d’utiliser Grafana comme outil de visualisation et d’analyse. Grafana dispose par défaut d’un connecteur InfluxDB afin de pouvoir accéder à nos jeux de données. Concernant les données des stations de vélo JCDecaux, le dashboard que nous avons réalisé est le suivant :

Grafana dispose non seulement d’un connecteur InfluxDB par défaut, mais prend également en charge le langage de requêtes InfluxDB afin de permettre à l’utilisateur de construire les visualisations qui l’intéresse. Il est ainsi possible de réaliser facilement des agrégations, des filtres et des sélections sur nos données de série temporelle.

Conclusion

L’architecture que nous avons proposée dans cet exemple est réutilisable dans de nombreux contextes, et permet une visualisation rapide des données. Pour donner un autre exemple qui intéressera probablement les passionnés d’investissement et d’Elon Musk, voici le dashboard Grafana que nous avons réalisé pour l’étude des crypto-monnaies, à vous de jouer !

Merci à Guillaume Dupont et Yann Kervella Pro d’avoir collaboré avec moi pour la rédaction de cet article.

--

--