Qu’est ce que Apache Pulsar ?

Jonathan Chauvin
CodeShake
Published in
5 min readMar 26, 2021

Il y a quelques semaines, Techcrunch publiait un article (https://techcrunch.com/2021/01/27/datastax-acquires-kesque-as-it-gets-into-data-streaming/) à propos de Datastax (à l’origine des bases de données Cassandra) annonçant l’achat de Kesque. Et comme beaucoup d’articles, il y a toujours une techno que l’on ne connaît pas assez; voire pas du tout.

C’était mon cas pour Apache Pulsar, alors j’ai cherché à en savoir plus.

Apache Pulsar

Développé initialement par Yahoo!, puis cédé à la fondation Apache en 2016, Pulsar est un “Event Streaming Platform”.

Le streaming d’évènement est le fait de capturer des données venant de différentes sources (base de données, application, capteurs..) sous la forme de flux d’événements. Ces données peuvent alors être stockées et utilisées ultérieurement.

Voici quelques cas d’utilisation possibles ;

  • Récupération de la position d’un véhicule en temps réel
  • Transactions financières en temps réel
  • Récupération de données d’une application

Vous pouvez récupérer les sources sur https://pulsar.apache.org/en/download/

Ou avec la commande :

wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz

Après avoir extrait l’archive, placez vous dans le dossier apache-pulsar-2.7.0 et lancez la commande :

bin/pulsar standalone

Cette commande va lancer un cluster Pulsar.

Comment ça marche ?

Pulsar utilise le modèle publish-subscribe. Ce modèle est composé de trois éléments: les producers, les consumers et les topics.

Les producers envoient des messages aux topics. Les topics stockent les messages. Les consumers viennent chercher les messages disponibles dans les topics.

Un concept important concerne l’aspect multi-tenancy. En effet, Pulsar gère les topics selon un modèle particulier. Les topics sont contenus dans des namespaces, eux même contenus dans des tenants ce qui permet entre autre d’avoir différents niveaux de permissions entre les topics/namespaces/tenants.

Les url sont de la forme tenant/namespace/topic

Source : https://pulsar.apache.org

Il existe quatre modes de subscriptions :

  • Exclusive : un seul et unique consumer
  • Failover : un consumer d’écoute et un autre de secours en cas de fail
  • Shared : plusieurs consumers en parallèle
  • Key_Shared : plusieurs consumers en parallèle, les messages sont distribués selon des clés.
Source : https://pulsar.apache.org/docs/en/concepts-messaging/

Les producers peuvent d’ailleurs envoyer les messages de manière synchrone ou asynchrone. Concrètement, le mode synchrone bloque l’envoi d’un nouveau message le temps de s’assurer que le précédent ait bien été envoyé.

En asynchrone, il n’y a pas d’attente de confirmation.

Pour les consumers, le mode synchrone bloque l’exécution jusqu’à l’arrivée d’un message alors que le mode asynchrone utilisera une valeur future (future value).

Schéma

Un schema permet de “mettre d’accord” les consumers et producers sur la nature des données envoyées. C’est une classe connue des deux entités, permettant de parser les données et les traiter plus naturellement.

Pour plus d’informations, vous pouvez consulter la documentation officielle https://pulsar.apache.org/docs/en/client-libraries-python/#schema

Producer

import pulsar
from pulsar.schema import AvroSchema, JsonSchema
from schemas.Example import Example
from schemas.Person import Person
# client connect to running pulsar clusterclient = pulsar.Client(‘pulsar://localhost:6650’)producer = client.create_producer(topic=’my-topic’, schema=AvroSchema(Person))client.close()

Consumer

import pulsar
from pulsar.schema import AvroSchema
from schemas.Person import Person
client = pulsar.Client(‘pulsar://localhost:6650’)consumer = client.subscribe(‘my-topic’, ‘my-subscription’, schema=AvroSchema(Person))# Don’t forget to close the client and allow other consumer to connect to topic
client.close()

Il est également possible de souscrire à plusieurs topics :

import re
consumer = client.subscribe(re.compile(‘persistent://public/default/topic-*’), ‘my-subscription’)

Acknowledgement

Lorsqu’un consumer consomme un message, il envoie un ack (=acknowledgement) pour faire savoir au broker qu’il a bien consommé et traité ce message.

Un curseur permet de savoir quel message a été traité pour qu’il ne soit consommé qu’une fois. Le ack peut être fait de manière individuelle, c’est-à-dire à chaque message mais aussi de manière cumulée.

Ici apparaît deux autres concepts, la rétention et l’expiration des messages.

La rétention permet de garder des messages en mémoire même s’ils ont été ack. L’expiration permet d’ajouter un TTL (Time to Live) aux messages non ack.

try:# Some functions here
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)

Architecture

Un cluster Pulsar est constitué de trois briques principales :

  • Les brokers permettent de communiquer avec les producers et consumers., ils sont composés eux même :
  • D’un dispatcher qui gère les messages
  • D’une API Rest pour les administrer
  • D’une vue du topic auquel ils sont rattachés

Les brokers sont stateless, par conséquent il est aisé d’ajouter des nœuds au cluster. On peut également ajouter dynamiquement des bookies (instances BookKeeper) sans devoir toucher aux partitions. C’est pratique de ne pas avoir à stopper Pulsar pour ça.

  • Un cluster BookKeeper (bookies pour les intimes) qui sert à persister les messages et les topics
  • Un cluster ZooKeeper qui gère les métadonnées de brokers et bookies

Les partitions

Les topics peuvent être partitionnés à travers les brokers afin d’augmenter le trafic possible entre le cluster Pulsar et les producers et/ou consumers qui l’utilisent.

Ici, le topic 1 est composé de 5 partitions distribuées entre 3 brokers. La distribution est gérée automatiquement par le cluster Pulsar, mais peut également être définie lors du partitionning via des routes. Je ne donnerai pas d’exemple ici mais vous pouvez consulter la documentation Pulsar à ce propos ici.

Source: https://pulsar.apache.org/docs/en/concepts-messaging/

Code complet

Person class as schema:

from pulsar.schema import *class Person(Record):firstname= String()
lastname=String()
address=String()

Producer

import pulsar
from pulsar.schema import AvroSchema, JsonSchema
from schemas.Example import Example
from schemas.Person import Person
client = pulsar.Client(‘pulsar://localhost:6650’)producer = client.create_producer(topic=’my-topic’, schema=AvroSchema(Person))for i in range(20):
person1 = Person(firstname=”John” + i.__str__(), lastname=”Doe”, address=”Lille”)
producer.send(person1)
client.close()

Consumer

import pulsar
from pulsar.schema import AvroSchema
from schemas.Person import Person
client = pulsar.Client(‘pulsar://localhost:6650’)consumer = client.subscribe(‘my-topic’, ‘my-subscription’, schema=AvroSchema(Person))while True:
msg = consumer.receive()
try:
print(“Received message ‘{}’ id=’{}’”.format(msg.value().firstname, msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()

Vous devriez voir les logs comme suit :

2021–03–10 15:20:16.438 INFO [0x7000026f4000] ConsumerStatsImpl:70 | Consumer [persistent://public/default/my-topic, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 390, totalNumBytesRecieved_ = 390, receivedMsgMap_ = {[Key: Ok, Value: 20], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 20], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 20], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 20], })

Conclusion

Pour :

  • Multi-tenancy
  • Persistance des topics (grâce aux bookies)

Contre :

  • plusieurs éléments qui complexifient Pulsar

Finalement, Pulsar a ses avantages et ses inconvénients et pourrait convenir à des besoins particuliers à l’avenir.

Toutefois, il faudra faire attention aux latences (que je n’ai pas testé) qui peuvent être plus longues que chez certains de ses concurrents.

Cet article est basé sur la version 2.7.0 publiée le 25 novembre 2020.

--

--