Dans mon précédent article sur les plateformes de streaming de données, j’avais promis d’aborder un autre outil : Apache Pulsar. J’ai choisi de lui faire un article dédié car c’est un outil que j’apprécie beaucoup et qui, pour moi, a un fort potentiel dans le domaine du streaming de données.

Table des matières

Apache Pulsar est une plateforme de messagerie distribuée et de streaming de données. Initialement développé par Yahoo en 2015, Pulsar est depuis 2017 sous la responsabilité de la fondation Apache.

La création de Pulsar a été motivée par la nécessité de disposer d’une solution qui pourrait surmonter les limitations des systèmes existants, notamment la scalabilité horizontale, le multi-tenant et la flexibilité.

Au moment de la rédaction de cet article, Pulsar est disponible en version 3.1.0 et le projet GitHub totalise 13.2k étoiles et plus de 600 contributeurs !

Rentrons maintenant dans le vif du sujet, en commençant par comprendre comment fonctionne Pulsar !

Architecture

Un cluster Pulsar est composé de 3 briques :

  • La couche de service, le coeur de Pulsar, est composé d’un ou plusieurs brokers qui assurent la communication entre les producteurs et les consommateurs.
  • La couche de persistance est représentée par BookKeeper. Elle a pour rôle de stocker les messages de manière fiable et durable via un ou plusieurs bookies.
  • Enfin ZooKeeper gère les métadonnées des brokers et bookies et se charge de la coordination entre les différents clusters Pulsar.

Si l’on se penche sur le broker, il possède plusieurs rôles :

  • La configuration, la surveillance et la gestion des ressources (topics, namespaces…) via une API REST hébergé dans un serveur HTTP (Jetty)
  • Le routage des messages, via un dispatcher (aka. serveur de protocole), en utilisant le protocole Pulsar (protocole binaire optimisé pour une faible latence et un haut débit)
  • La gestion de topics, en s’appuyant sur BookKeeper
  • La gestion des abonnements
  • Le cache de métadonnées, pour limiter l’accès à ZooKeeper

Les brokers sont stateless, il est donc assez simple d’en ajouter ou d’en supprimer du cluster. Cela permet de répartir la charge et de gérer un volume massif de données.

C’est quoi BookKeeper ?

Apache BookKeeper est un système de stockage de journaux distribués, conçu pour être scalable, fiable et à faible latence. Il s’agit aussi d’un développement de Yahoo, qui a été cédé à la fondation Apache.

Chaque unité d’un journal est une entrée (aka. record). C’est ce qui contient les données écrites par le client, ainsi qu’un certain nombres de métadonnées (identifiant unique, date de confirmation, identifiant du registre…).

Les flux d’entrées de journaux sont appelés registres (aka. ledgers). C’est l’unité de stockage de base dans BookKeeper Les entrées sont écrites dans les registres séquentiellement et au maximum une fois. Les registres sont de type append-only, c’est-à-dire que les entrées ne peuvent être ajoutées qu’à la fin et qu’elles ne peuvent pas être modifiées une fois écrites.

Les noeuds qui stockent les registres sont appelés bookies. Lorsqu’une nouvelle entrée est ajoutée à un registre, elle est d’abord écrite dans un journal (WAL, Write-aHead-Log). Cela garanti la durabilité en permettant la récupération en cas de défaillance. Ensuite, l’entrée est écrite dans le stockage principale du bookie, qui est un stockage optimisé pour la lecture.

Source : https://medium.com/streamnative/why-apache-bookkeeper-part-1-consistency-durability-availability-ac697a3cf7a1 Source : https://medium.com/streamnative/why-apache-bookkeeper-part-1-consistency-durability-availability-ac697a3cf7a1

Pour garantir la disponibilité des données, les entrées d’un registre sont répliquées sur un ensemble de bookies (déterminé par le paramètre write quorum size). Chaque bookie va donc écrire la donnée puis envoyer un accusé de réception au client. Une fois que le client a reçu suffisamment d’accusés de réception (déterminé par le paramètre ack quorum size), le quorum est atteint et l’écriture est considérée comme réussie.

Ce mécanisme garantie la durabilité des données mais peut avoir un impact sur la latence d’écriture.

En cas de panne d’un bookie, BookKeeper est capable de le détecter et lancer un processus de réplication pour restaurer les registres présents vers d’autres bookies actifs et sains.

Tant qu’il y a suffisamment de bookies, BookKeeper est conçu pour être fiable et résistant à une grande variété de défaillances (pannes, corruptions…).

BookKeeper fournit un client (aka. une interface) permettant d’intéragir avec lui. Il permet la création et la suppression des registres, ainsi que la lecture et l’écriture d’entrées dans les registres, via 2 APIs : Ledger API et Distributed Log API.

C’est quoi ZooKeeper ?

Apache ZooKeeper est un système de coordination distribué, qui fournit des services comme la gestion de configuration, le nommage ou encore la synchronisation distribuée. Autrement dit, il permet aux applications distribuées de se coordonner et de travailler ensemble de manière fiable et efficace. Comme BookKeeper, il est hautement disponible et résilient aux pannes.

A l’origine un sous-projet d’Apache Hadoop, ZooKeeper est désormais un projet à part entière de la fondation Apache.

ZooKeeper fonctionne avec un nombre impair serveurs (aka. noeuds) pour permettre le mécanisme de vote. Au sein des différents noeuds serveurs de ZooKeeper, un leader est élu. Les autres serveurs sont appelés suiveurs (aka. followers).

Le leader est responsable de la coordination de l’ensemble des réplicas et c’est par lui que passe l’ensemble des écritures avant d’être propagées aux suiveurs (via le protocole Zab, ZooKeeper Atomic Broadcast). En cas de panne ou de dysfonctionnement du leader, un autre est immédiatement élu pour prendre sa place.

Le modèle de données est basé sur une hiérarchie de noeuds (aka. arborescence, comme la structure d’un système de fichier). Chaque noeud de cette hiérarchie est appelé Znode (aka. ZooKeeper Data Nodes) et possède des métadonnés (nom, numéro de version, ACL, horodatage…). Un Znode peut être persistant, éphémère (actif tant que le client est connecté) ou séquentiel.

Source : https://datascientest.com/apache-zookeeper-tout-savoir Source : https://datascientest.com/apache-zookeeper-tout-savoir

ZooKeeper est utilisé par Pulsar et BookKeeper pour stocker les métadonnées et la configuration, gérer la coordination des brokers et monitorer la santé des noeuds.

Messaging

Comme beaucoup de ses concurrents, il utilise le pattern pub/sub avec des topics. On va donc retrouver les traditionnels producteurs et consommateurs ainsi que les topics. Je ne vais pas refaire la présentation du pub/sub, j’en ai parlé dans l’article sur les plateformes de streaming de données.

Passons donc aux spécificités de Pulsar à ce niveau.

Les messages

L’unité de base est donc le message, qui est composé de données (aka. payload) et de métadonnées (key, nom du producteur, nom du topic, numéro de version du schéma, identifiant unique…).

Par défaut, la taille maximale d’un message est de 5 MB.

Par défaut, les messages sont envoyés de manière unitaire (aka. single message). Il est aussi possible d’envoyer des lots de messages (aka. batch message).

Si nécessaire, il est possible d’activer la compression des messages pour en réduire la taille (au détriment d’une consommation CPU un peu plus importante). Pulsar supporte 4 types de compression : LZ4, ZLIB, ZSTD et SNAPPY. Le type de compression est stocké dans les métadonnées du message. Le consommateur sera donc capable de s’adapter automatiquement.

Par défaut, un message acquitté est immédiatement supprimé par le broker et un message non-acquitté est stocké sur le disque. Il est possible de configurer des règles pour modifier ce comportement, via les politiques de rétention et d’expiration. On peut par exemple appliquer une rétention sur la taille (aka. size-based retention), pour persister l’ensemble des messages et supprimer les plus anciens lorsque le topic atteint une taille définie. On peut aussi jouer sur la durée (aka. time-based retention), en indiquant une durée maximale à partir de laquelle les messages seront supprimés.

Enfin, il est possible de spécifier un TTL (aka. time-to-live ou expiration), pour indiquer sur un message la durée maximale pendant laquelle il peut être acquitté. Si cette durée est atteinte, et qu’il n’a pas été acquitté, alors le message est supprimé.

Les topics

Dans Pulsar, les topics sont représentés par une URL : {persistent|non-persistent}://tenant/namespace/topic

On peut observer 4 informations :

  • le type de topic : un topic peut-être persistant (c’est le mode par défaut, les messages sont stockés sur le disque) ou non-persistant (les messages sont uniquement en mémoire et il existe donc un risque de perte de données en cas de défaillance d’un noeud, mais les performances sont meilleures)
  • le nom du tenant
  • le nom du namespace
  • le nom du topic

Multi-tenant

Pulsar a été nativement conçu pour gérer le multi-tenant.

<img class="fig-img" src="https://res.cloudinary.com/anceret-matthieu/image/upload/v1695391621/posts/apache-pulsar/pulsar-multi-tenant.png" >

Un tenant est une unité administrative permettant de gérer une capacité et un fournisseur d’authentification spécifique.

A l’intérieur d’un tenant, vient ensuite la notion de namespace. Il s’agit d’une unité administrative intermédiaire permettant de regrouper différents topics en fonction d’autres critères (une application particulière par exemple). On peut y associer différentes stratégies, notamment au niveau de la rétention et de l’expiration des messages. Les stratégies configurées au niveau d’un namespace sont appliquées sur l’ensemble des topics de ce namespace.

Pour maximiser les performances (aka. le débit), les topics peuvent être distribués à travers plusieurs noeuds du cluster (aka. broker), sous la forme de partition (aka. un morceau d’un topic).

Ce type de topic doit être créé à partir de l’API admin de Pulsar.

Avec ce type de topic, il est nécessaire de préciser un mode de routage. Ce mode détermine vers quelle partition chaque message doit être publié. Il existe 3 modes :

  • RoundRobinPartition (mode par défaut)
  • SinglePartition
  • CustomPartition

Pour aller plus loin sur la façon dont Pulsar distribue la charge à travers les différents brokers, je vous invite à consulter la documentation. Elle explique comment Pulsar s’assure de la bonne utilisation des ressources disponibles au niveau des brokers tout en maximisant le débit au niveau des topics, via un load balancer.

Les producteurs

Après avoir parlé des topics, on va désormais voir le premier type de client dans Pulsar : les producteurs.

Comme son nom l’indique, un producteur va publier un message à destination d’un topic. On va pouvoir spécifier un mode d’envoi et un mode d’accès.

Concernant le mode d’envoi, on peut le faire de manière synchrone (on est bloqué en attendant un accusé de réception de la part du broker) ou asynchrone.

Concernant le mode d’accès, on peut en citer quatre :

  • Shared : plusieurs producteurs peuvent publier sur le même topic. C’est le mode par défaut.
  • Exclusive : seulement un producteur peut publier sur un topic donné à la fois (aka. il pose un verrou). Si un autre producteur essaye de publier sur ce topic alors qu’un producteur est déjà connecté, il y aura un échec (aka. il faut que le verrou soit libéré).
  • ExclusiveWithFencing : même principe que le mode Exclusive, mais le verrou peut-être volé par un autre producteur.
  • WaitForExclusive : même comportement que le mode Exclusive, à la différence que le second producteur va attendre que la place se libère au lieu d’échouer.

Les consommateurs et les souscriptions

Après les producteurs, on va désormais voir le second type de client dans Pulsar : les consommateurs.

Comme son nom l’indique, un consommateur va consommer des messages à partir des topics, via une souscription.

Une souscription est une entité logique permettant de suivre le curseur (aka. offset) et de regrouper plusieurs consommateurs (pour distribuer la charge de consommation).

Le curseur est stocké dans un registre (aka. ledger) BookKeeper. Il est mis à jour à chaque fois qu’un consommateur va acquitter un message.

Par défaut, à la création d’une souscription, le curseur est positionné à la fin du topic.

Comme pour les producteurs, les consommateurs peuvent recevoir les messages de manière synchrone ou asynchrone.

Concernant la souscription (aka. comment les messages sont délivrés aux consommateurs), il en existe plusieurs types :

  • Exclusive : un seul et unique consommateur peut lire le topic à la fois. C’est le type par défaut.
  • Failover : un consommateur d’écoute et un autre de secours en cas de problème (aka. active/passive ou active/backup). Il peut y avoir plusieurs consommateurs de secours, auquel cas, ils sont déclenchés les uns après les autres en cas de défaillance d’un consommateur principal.
  • Shared : plusieurs consommateurs peuvent consommer les messages. Les messages sont délivrés via l’algorithme round-robin. Attention, ce mode ne garanti aucun ordre de livraison des messages.
  • Key_Shared : même principe que le type Sahred, avec la garantie que tous les messages avec la même clé seront livrés à un même consommateur.

Un consommateur peut évidemment souscrire à plusieurs topics.

Fonctionnalités

Après avoir vu les principaux concepts de base de Pulsar, nous allons passer en revue quelques fonctionnalités spécifiques.

Pulsar Instance (aka. géo-réplication)

Pour garantir une haute disponibilité et une résilience en cas de panne et pour offrir une latence optimale dans plusieurs régions, Pulsar permet gérer des clusters Pulsar dans différentes régions géographiques : il s’agit de Pulsar Instance.

Les messages vont donc être synchronisés, en arrière-plan, entre les différents clusters (de manière synchrone ou asynchrone).

Concernant le mode de réplication, Pulsar en supporte trois :

  • Actif-actif
  • Failover
  • Aggregated Replication

Dans le cas où vous utilisez la géo-réplication, il est important de garder un œil sur les différentes statistiques, et particulièrement le replication backlog. Il s’agit du nombre de messages qui ont été produits dans le cluster d’origine mais qui n’ont pas été répliqués vers les autres clusters. Une augmentation de cette valeur pourrait indiquer un problème au niveau de la réplication (problème de réseau entre les clusters ou de dimensionnement).

Pour aller plus loin, je vous invite à consulter la documentation d’Apache Pulsar à ce sujet.

Schéma

Une notion très intéressante dans Pulsar est le schéma. Il s’agit d’un ensemble de métadonnées permettant d’indiquer comment traduire un message brut dans un format structuré. Autrement dit, il sert à définir un langage commun entre le producteur et le consommateur, au moment de la sérialisation et de la désérialisation des messages.

Le schéma étant spécifique à un topic, cela permet de s’assurer que les producteurs et consommateurs ne peuvent s’y connecter que s’ils utilisent un schéma compatible.

En cas d’évolution, il est possible de versionner les schémas.

La documentation est très complète à ce sujet, et donne plusieurs scénarios et exemples.

Tiered Storage

Dans certains cas, on peut avoir besoin de conserver une grande quantité de message sur un topic pour une longue durée. Pour limiter les coûts, Pulsar propose une fonctionnalité permettant de déplacer ces données (à partir de BookKeeper donc) vers un stockage moins cher et plus adapté, tout en conservant la possibilité au client (aka. consommateur) d’y accéder : Tiered Storage.

Nativement, Apache Pulsar supporte les cibles suivantes :

Une fois le provider configuré, il suffit de configurer une stratégie au niveau du namespace ou du topic pour définir le seuil de déclenchement automatique de l’offloading. Ce seuil est basé sur la taille des données. Il est aussi possible de déclencher manuellement l’opération via la CLI de Pulsar.

Pulsar Functions

Les fonctions Pulsar sont des processus légers permettant de transformer les messages entrants, et donc d’obtenir une logique de traitement flux intégrée.

Voici les étapes :

  1. Consommation d’un message à partir d’un topic
  2. Application de la logique sur le message
  3. Eventuellement, publication du résultat (aka. le message modifié) dans un autre topic

Elles peuvent être écrites en Java, Python et Go. Elles sont déployées dans des function worker, soit directement dans les brokers, soit dans des conteneurs dédiés.

On retrouve plusieurs cas d’utilisation, comme le filtrage, la modification ou l’enrichissement des messages, l’aggrégation de données ou l’alerting . On peut en effet imaginer plusieurs scénarios :

  • pour des raisons de confidentialité, on a besoin de masquer ou remplacer certaines informations sensibles
  • seulement certaines informations du message sont pertinentes
  • réception d’un message dans un format que l’on souhaite convertir dans un autre
  • génération d’alertes en cas de détection d’erreurs ou d’anomalies dans les données

Les fonctions peuvent avoir un état persistant et distribué. Sous le capot, c’est BookKeeper qui est utilisé, et l’état est donc répliqué. Il existe une API permettant de stocker et lire l’état et celui-ci est isolé par fonction (on ne peut donc pas accéder à l’état d’une autre fonction).

Il est aussi possible de chainer les fonctions pour créer des workflows de traitement plus complexes.

Pulsar IO

Pulsar IO permet d’intégrer un cluster Pulsar à des systèmes externes via des connecteurs.

Il existe 2 types de connecteurs :

  • Source : capture d’une donnée externe pour l’écrire dans un topic Pulsar. Par exemple, à partir d’une base de données (MySQL, PostgreSQL, SQL Server, MongoDB…), d’un fichier, de Kafka ou encore de RabbitMQ.
  • Sink : consomme une donnée depuis un topic Pulsar pour l’écrire dans un système externe. Par exemple, vers ElasticSearch, InfluxDB, Redis, Kafka, RabbitMQ, une API HTTP ou encore des bases de données.

En plus de tous les connecteurs déjà existants, il est possible d’écrire des connecteurs personnalisés. Pour cela, il faut implémenter, en Java, les interfaces Source et *Sinkù de org.apache.pulsar.io.core.

Techniquement parlant, Pulsar IO s’appuie sur Pulsar Functions, et plus spécifiquement les functions workers.

Pulsar SQL

En s’appuyant sur la notion de schéma, il est possible de requêter les données d’Apache Pulsar via Trino (anciennement Presto SQL). Cette fonctionnalité s’appelle Pulsar SQL. Trino est un moteur de requêtes distribué haute performance, particulièrement adapté pour les gros volumes de données.

La syntaxe est compatible avec SQL, avec quelques extensions pour gérer les gros volumes de données et les requêtes distribuées. Par exemple, des types de données supplémentaires (ARRAY, MAP, ROW…) ou encore des fonctions de fenêtrage temporel (WINDOW) ou d’agrégations sur des fenêtres de temps glissantes.

Pulsar SQL est particulièrement adapté pour exécuter des requêtes en temps réel sur des flux de données. Voici quelques exemples de cas d’usage :

  • analyse en temps réel des flux de données (surveillance, alerting, alimentation de tableaux de bord…)
  • construction de rapports pour l’analyse de tendances, l’évaluation de performances ou la vérification de qualité des données
  • jointure entre des données provenant de différents topics ou d’autres sources de données

Il est possible d’exécuter des requêtes via la CLI ou via l’API REST de Trino.

Imaginons un scénario où des capteurs IoT envoient des données de température et d’humidité à un topic Pulsar. Il est possible d’écrire une requête pour surveiller ces données et détecter des conditions anormales :

SELECT sensorId, AVG(temperature) AS avg_temp
FROM sensor_data
WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY sensorId
HAVING avg_temp > 30;

Cette requête permet d’identifier les capteurs ayant une température moyenne supérieure à 30°C au cours de la dernière heure.

Pulsar Manager - WebUI

Enfin, terminons avec Pulsar Manager, qui est une UI web permettant de gérer et monitorer l’ensemble des éléments d’un environnement Pulsar (tenants, namespaces, topics, subscriptions, brokers, clusters…).

Il s’agit d’un composant distinct, qu’il est nécessaire de déployer à côté de Pulsar (via Docker par exemple).

En plus de l’outil officiel, il existe aussi d’autres solutions, comme Pulsar Express.

Si vous préférez la ligne de commande, il est toujours possible de passer par la CLI de Pulsar ou encore par Pulsar Shell ;)

Sécurité

Dans un système amené à être central/critique, la sécurité est bien évidemment cruciale. Pour cela, Pulsar s’appuie sur 3 piliers :

  • le chiffrement
  • l’authentification
  • l’autorisation

Attention, par défaut, Pulsar ne configure aucune sécurité particulière. Tout client peut donc communiquer avec Pulsar librement. C’est à vous de venir restreindre les accès en fonction de vos besoins.

Le premier pilier, le chiffrement, va permettre de s’assurer que si un attaquant arrive à avoir accès à vos données, il ne sera pas en mesure de les lire sans avoir aussi en sa possession les clés. Ce pilier repose sur la protection des données en transit (via TLS ou le chiffrement de bout-en-bout) et au repos (en s’appuyant sur le système de fichier ou les solutions de stockage sous-jacentes).

Concernant le second pilier, l’authentification, il s’agit de vérifier l’identité des clients et de leur associer un rôle (role token). Pour cela, Pulsar s’appuie sur un fournisseur d’authentification. Plusieurs fournisseurs sont supportés, via un système de plugins :

Enfin, le troisième pilier, l’autorisation, permet de déterminer ce que les clients ont le droit de faire. Cela passe par le role token, qui est simplement un nom permettant de faire le lien avec un ensemble de permissions. Le rôle le plus élevé est superusers, qui peut créer et détruire les tenants ainsi qu’avoir un accès complet à l’ensemble des ressources. En dessous, on trouvera le rôle admin, qui permet de gérer les namespaces ainsi que les permissions au sein de ce namespace.

Les autres rôles sont ensuite à définir en fonction des besoins et des politiques de sécurité. Quoi qu’il est en soit, il est recommandé de suivre le principe du moindre privilège et de n’attribuer que les autorisations strictement nécessaires.

Voici un exemple de rôle créé via la CLI Pulsar :

pulsar-admin namespaces grant-permission my-wonderful-tenant/namespace1 \
--role test-role \
--actions produce

Dans le code ci-dessus, nous avons créé un rôle test-role qui possède le droit de publier (produce) dans l’ensemble des topics du namespace namespace1 appartenant au tenant my-wonderful-tenant. Les droits disponibles sont produce et consume, sachant qu’ils sont cumulatifs.

Demo time !

La première chose à faire est de monter un environnement local avec un Apache Pulsar et un Pulsar Manager. Pour cela, on va passer par une stack docker-compose :

version: "3.7"
services:
  pulsar:
    image: apachepulsar/pulsar:3.1.1
    command: bin/pulsar standalone
    hostname: pulsar
    ports:
      - "8080:8080"
      - "6650:6650"
    restart: unless-stopped
    volumes:
      - "./data/:/pulsar/data"
  dashboard:
    image: apachepulsar/pulsar-manager:v0.2.0
    ports:
      - "9527:9527"
      - "7750:7750"
    depends_on:
      - pulsar
    links:
      - pulsar
    environment:
      SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties

Une fois démarré, vous allez retrouver l’UI web de Pulsar Manager sur http://127.0.0.1:9527. Pour pouvoir vous connecter, vous allez devoir créer un utilisateur via les commandes suivantes (à exécuter dans WSL si vous êtes sur Windows) :

CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
    -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
    -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
    -H 'Content-Type: application/json' \
    -X PUT http://localhost:7750/pulsar-manager/users/superuser \
    -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "[email protected]"}'

Une fois connecté, vous allez devoir configurer votre environnement (c’est l’équivalent de votre instance Pulsar). Pour l’URL, vous pouvez utiliser le nom du conteneur comme nom de domaine (par exemple, http://apache-pulsar-pulsar-1:8080).

On va maintenant ajouter une Pulsar Fonction.

La première étape est d’écrire la fonction. Pour ce faire, j’ai choisi de la langage Python. Ma fonction est (très) simple : elle vérifie si le message contient le texte alert et si c’est le cas, elle publie un message dans un topic spécifique.

from pulsar import Function

class AlertFunction(Function):
    def process(self, input, context):
        # Vérifier si le message contient "alert"
        if "alert" in input:
            # Le message à publier sur le topic d'alerte
            alertMessage = "Alert found in message: " + input

            # Publier le message d'alerte sur le topic de sortie configuré
            context.publish(context.get_output_topic(), alertMessage)

        # Pas besoin de retourner une valeur
        return None

Une fois notre code écrit, on va pouvoir déployer la fonction sur notre instance Pulsar. Pour cela, on va se connecter au conteneur Docker de Pulsar et exécuter les commandes suivantes :

# Création des topics
bin/pulsar-admin topics create persistent://public/default/sample-inputs
bin/pulsar-admin topics create persistent://public/default/alerts

# Déploiement de la fonction
bin/pulsar-admin functions localrun \
  --py myfunc.py \
  --classname myfunc.SomeFunction \
  --inputs persistent://public/default/sample-inputs \
  --output persistent://public/default/alerts

2 points importants :

  • N’oubliez pas de monter votre script Python dans le conteneur Pulsar via les volumes du fichier docker-compose
  • Pour la propriété classname, il est nécessaire de préciser le nom du fichier avant le nom de la classe

Une fois notre environnement en place, on va pouvoir créer notre consommateur et notre producteur. Pour cela, on va développer deux projets console en .NET, avec l’aide de la librairie DotPulsar.

Avec DotPulsar, tout commence par l’instanciation d’un client. Ce client pourra ensuite créer des consommateurs, des producteurs ou des lecteurs (un lecteur est un consommateur sans curseur, ce qui veut dire que Pulsar ne peut pas suivre sa progression et qu’il n’est pas nécessaire d’accuser réception des messages). Chaque client possède son propre pool de connexions, qui sont donc partagées entre les différents acteurs. Si vous avez des besoins de performances spécifiques, il est possible de créer un client dédié à un seul acteur.

Commençons par notre producteur :

using DotPulsar;
using DotPulsar.Extensions;

// Connecting to Pulsar
await using var client = PulsarClient.Builder()
  .ServiceUrl(new Uri("pulsar://localhost:6650"))
  .Build();

// Create a producer
await using var producer = client.NewProducer(Schema.String)
    .Topic("persistent://public/default/mytopic")
    .Create();

// Sending data
await producer.Send("My message !");

// Sending data with metadata
var messageId = await producer.NewMessage()
    .Property("MyKey", "MyValue")
    .Send("Hello, World!");

Une fois que l’on est capable de produire des messages, on peut désormais passer à leur consommation :

// Beginning is same as producer...

// Create a consumer
await using var consumer = client.NewConsumer(Schema.String)
    .SubscriptionName("MySubscription)
    .Topic("persistent://public/default/mytopic")
    .InitialPosition(SubscriptionInitialPosition.Earliest)
    .SubscriptionType(SubscriptionType.Shared)
    .Create();

// Receiving messages
await foreach(var message in consumer.Messages())
{
    Console.WriteLine($"Received: {message.Value()}");
    await consumer.Acknowledge(message);
}

Lors de la réception du message, celui-ci possède plusieurs propriétés comme son identifiant, sa clé de routage, le nom du producteur, l’identifiant de séquence…

DotPulsar offre plusieurs fonctionnalités intéressantes, comme le suivi de l’état des acteurs ou encore la gestion des exceptions. Je vous laisse consulter l’excellente documentation sur ces sujets.

Enfin, DotPulsar a l’excellente idée d’intégrer nativement le support des traces et des métriques via OpenTelemetry !

Conclusion

A travers cet article, nous avons exploré les mécanismes d’Apache Pulsar, son architecture innovante avec BookKeeper et ZooKeeper ainsi que ses fonctionnalités, comme le Tiered Storage, Pulsar SQL ou encore la notion de schéma. Il s’agit d’une excellente plateforme de streaming, conçue nativement pour le cloud en prenant en compte des problématiques complexes comme la géo-réplication et le multi tenant tout en offrant un fort débit avec une faible latence.

En regardant vers l’avenir, Pulsar se positionne comme un acteur clé dans l’évolution des systèmes de messagerie et de streaming de données, notamment dans les sujets d’IoT et de Big Data. Sa communauté, en croissance constante, contribue activement à son développement, avec l’aide d’une core team extrêmement réactive.

Au même titre que Kafka, plusieurs cloud provider propose la solution Pulsar en offre 100% managée (Pandio, StreamNative, Kesque…). J’ai choisi de mettre le focus sur l’offre de Clever Cloud. C’est un hébergeur européen qui propose des services PaaS de qualité (hébergement de site web sur différentes stacks, bases de données, conteneurs, stockage…). Clever Cloud vous permet de bénéficier d’un cluster Pulsar géo-répliqué mutualisé, dans lequel un tenant et des namespaces vous sont dédiés. Libre à vous de créer et gérer vos topics dans ces namespaces. Attention cependant, si l’offre permet de bénéficier rapidement d’une infrastructure performante et infogérée de Pulsar, avec une gestion du stockage nativement paramétrée pour utiliser le tiered-storage (sur le stockage S3-compatible de Clever Cloud), elle ne permet pas de tirer profit de toutes les fonctionnalités de Pulsar. A titre d’exemple, les Pulsar Function et Pulsar IO ne sont pas utilisables sur cette offre mutualisée. La facturation s’effectue sur le volume de stockage ainsi que sur le trafic entrant et sortant.