Cet article est disponible en anglais sur : Medium
Les personnes qui me connaissent savent que je suis un fan inconditionnel d’Apache Kafka depuis longtemps, et il en est peut-être de même pour vous.
Mais en tant que consultant, je me dois de garder l’œil ouvert sur les autres plateformes de streaming concurrentes et Apache Pulsar est l’une d’entre elles.
Dans cet article, je vais essayer de vous donner un aperçu général de ce qu’est Apache Pulsar et de comment démarrer avec.
Avertissement : Cet article n’est pas une comparaison entre Apache Pulsar et une autre plateforme de streaming bien connue :)
Yet Another Streaming Platform, Yahoo !
Apache Pulsar est une plateforme de streaming distribuée, open-source qui a été créée à l’origine par Yahoo. Il fait partie des projets Top-Level de la fondation Apache depuis septembre 2018. La plateforme Apache Pulsar est principalement développée par StreamNative, la société fondée par les développeurs originaux d’Apache Pulsar et d’Apache BookKeeper.
Au moment de la rédaction de cet article, la dernière version de Apache Pulsar est la 2.4.1, le projet compte 4,3K étoiles et environ 170 contributeurs sur GitHub.
Apache Pulsar, Concepts et Architecture
Pulsar est une plateforme distribuée, multi-tenante, hautement performante et tolérante aux pannes, basée sur le modèle publish-subscribe.
Dans ce type d’architecture, on distingue deux types d’applications : les producteurs et les consommateurs. Nous trouvons également des concepts de topics, de messages et de souscriptions.
Définissons ces concepts dans le contexte d’Apache Pulsar.
Producer
Les Producers sont des applications qui publient des messages dans un ou plusieurs topics.
Les messages peuvent être publiés de manière synchrone ou asynchrone. Dans le premier cas, le Producer bloque et attend un accusé de réception (acknowlegement) de la part d’Apache Pulsar. Dans le second cas, le message est placé dans une file d’attente et envoyé ultérieurement.
Pour des raisons de performances, les messages peuvent être envoyés par batch et compressés afin d’optimiser la bande passante réseau utilisée. Actuellement, Apache Pulsar prend en charge les codecs de compression suivants : LZ4, ZLIB, ZSTD et SNAPPY.
Topic
Un topic est une abstraction utilisée pour regrouper des messages qui appartiennent à un même contexte métier ou technique. Les topics sont nommés sous la forme d’une URI ayant la structure suivante :
{type}://{tenant}/{namespace}/{topic}
Pour le moment, nous pouvons ignorer les notions de “tenant” et de “namespace” mais décrivons la notion de “type”.
Apache Pulsar supporte deux types de topics :
persistent (default) : tous les messages sont stockés de manière durable et répliqués sur disques.
non-persistent : tous les messages résident uniquement en mémoire et ne sont jamais persistés sur disques. En d’autres termes, cela signifie que des messages peuvent-être perdus en cas de perte d’un nœud.
Par défaut, Apache Pulsar crée automatiquement un topic s’il n’existe pas lorsqu’un client (producer ou consumer) tente de produire ou de consommer.
Enfin, un topic est distribué à travers l’ensemble des nœuds d’un cluster Apache Pulsar. Ainsi, chaque nœud qui compose un cluster possède un sous-ensemble d’un topic appelé partition.
Message
Un message représente l’unité de base d’Apache Pulsar. Il est composé d’une paire clé-valeur associée à : un ensemble de propriétés qui sont optionnelles et définies par l’utilisateur, de l’identifiant du Producer, de l’identifiant de séquence du message dans le topic et enfin de deux timestamps : event-time et processing-time.
Il est également important de noter que la clé est optionnelle et que l’identifiant de séquence est attribué par le Producer.
Routing Modes
Les Producers peuvent déterminer comment les messages sont distribués à travers les nœuds en spécifiant un mode de routage (routing mode). Un mode de routage détermine la partition cible pour chaque message et donc leur ordre de livraison.
Apache Pulsar propose trois modes de routage :
RoundRobinPartition (par defaut) : Le producer publie des messages ou des batchs de messages, qui n’ont pas de clé associées, sur l’ensemble des partitions, selon le principe du round-robin. Sinon, si une clé est spécifiée pour un message, un hash est généré et le message est envoyé sur une partition spécifique.
SinglePartition : Le Producer choisit aléatoirement une partition et y envoie tous les messages qui n’ont pas de clé associées. A l’inverse, si une clé est spécifiée pour un message, un hash est généré et le message est envoyé sur une partition spécifique.
CustomPartition : Si aucun des modes de routage précédents ne vous convient, le mode CustomPartition vous donne la possibilité d’implémenter votre propre
MessageRouter
.
Consumer, Subscription et Cursor
Les consumers peuvent s’abonner à un ou plusieurs topics pour consommer et traiter les messages publiés en créant ou en s’inscrivant à une subscription.
Une subscription est un mécanisme utilisé pour regrouper plusieurs consumers afin de répartir la charge de consommation. Chaque subscription est identifiée à l’aide d’un nom défini par l’utilisateur.
De plus, une subscription est utilisée pour suivre la progression de chaque consommateur au sein d’un groupe. Pour cela, Apache Pulsar utilise un concept de cursor. Ainsi, chaque subscription est associée, pour chaque topic, à un cursor qui est mis à jour à chaque fois qu’un consumer accède à un message.
Les messages peuvent être acquittés soit un par un, soit de manière cumulative (ce qui signifie que seul le dernier message reçu fait l’objet d’un accusé de réception).
Subscription Modes
Apache Pulsar définit quatre modes d’abonnement (subscription mode) à un topic qui peuvent être configurés pour définir la manière dont les messages sont délivrés aux consommateurs.
Il est important de choisir le bon mode d’abonnement, qui correspond à votre besoin. En effet, certains modes peuvent avoir un impact significatif sur l’ordre de consommation des messages ou sur le comportement des consommateurs en cas de crash.
Exclusive
Le mode exclusive vous permet de n’avoir qu’un seul consumer attaché à un abonnement donné. L’instance de consommateur consommera alors tous les messages de toutes les partitions des topics. Les autres consommateurs qui tentent de s’abonner seront simplement rejetés. Il s’agit du mode d’abonnement par défaut.
Failover
Le mode failover permet l’abonnement de plusieurs consumers à un même abonnement. Les consumers sont ordonnés en fonction de leur nom qui est configuré par le développeur. Le premier consumer reçoit tous les messages tandis que les autres sont en attente. En cas de défaillance du premier consommateur, tous les messages (non acquittés et qui suivent) seront envoyés au consumer suivant.
Shared
Le mode shared vous permet d’avoir un ou plusieurs consumers rattachés à un même abonnement. Les messages sont transmis à tous les consumers de l’abonnement en round-robin. Lorsqu’un consommateur se déconnecte ou crash, tous les messages non acquittés sont envoyés aux autres consumers.
Il est important de noter que ce mode ne garantit aucun ordre de livraison.
Key Shared
Le mode de Key Shared est identique au mode Shared précédent, sauf qu’Apache Pulsar vous donne la garantie que tous les messages qui sont associés à une même clé seront délivrés à un unique consumer.
Durée de stockage et politique d’expiration
Apache Pulsar vous permet de configurer la durée de stockage sur disque des messages, qu’ils soient acquittés ou non par les consumers.
En mixant les propriétés de rétention et d’expiration, un topic peut être configuré avec quatre types distincts de politique de persistance.
- Acknowledgement-based retention : Les messages sont immédiatement supprimés d’un topic dès lors qu’ils ont été consommés et acquittés par tous les subscriptions. Dans le cas contraire, tous les messages non accusés de réception sont stockés sur disque. C’est le comportement par défaut !
Size-based retention : Les messages acquittés et non acquittés sont conservés sur disque. Les messages les plus anciens sont automatiquement supprimés lorsque le topic atteint une taille limite configurée.
Time-based retention : Les messages acquittés et non acquittés sont conservés sur disque. Les messages antérieurs à une durée de rétention configurée sont automatiquement supprimés.
- Time-To-Live retention : Les messages sont automatiquement supprimés des topics s’ils ne sont pas acquittés après le TTL configuré.
Enfin, Apache Pulsar fournit également un mécanisme de compaction qui est un type particulier de rétention. Lorsqu’une compaction est déclenchée manuellement ou lorsqu’un topic atteint une certaine taille, seul le message le plus récent pour une clé associée est conservé.
L’architecture à deux couches d’Apache Pulsar
Un cluster Apache Pulsar peut être vu comme une architecture à deux couches, l’une est appelée la couche de service et l’autre est appelée la couche de persistance.
Brokers
La couche de service est composée d’un ou de plusieurs nœuds, chacun hébergeant un broker, qui sont chargés de traiter et de load-balancer les requêtes entrantes des producers et des consumers.
La couche de service est stateless, ce qui signifie que les brokers ne stockent pas directement de données localement. Au lieu de cela, les brokers s’appuient sur la couche de persistance qui, comme son nom l’indique, a pour responsabilité de persister les messages sur disque via l’utilisation de Apache BookKeeper.
Log Streams, Ledgers and Bookies
Apache BookKeeper est un autre service open-source qui fournit un stockage durable, fiable et tolérant à la panne pour des entrées de logs de données, appelé log streams. Un log stream peut être défini comme une séquence illimitée d’enregistrements ordonnés et immuables et est généralement mis en œuvre en utilisant une structure dite append-only (également appelée Write-a-Head-Log).
Dans BookKeeper, les entrées des mog streams sont composés de ledgers qui sont gérés par des nœuds individuels appelés des bookies. Apache Pulsar utilise les ledgers pour stocker les messages écrits au sein des partitions de chaque topic. Chaque topic-partition est affectée à plusieurs ledgers et les messages sont écrits dans ces “ledgers”. Ensuite, les ledges sont répartis sur un ensemble de bookies de sorte que chaque bookie stocke des fragments de ledger.
Apache Zookeeper
Enfin, Pulsar et BookKeeper utilisent tous deux Apache Zookeeper comme métastore. Pulsar l’utilise pour conserver des informations spécifiques sur le cluster, telle que la configuration des topics, et comme un service de coordination pour gérer par exemple des élections distribuées.
Le diagramme ci-dessous illustre l’architecture d’Apache Pulsar.
Les fonctionnalités avancées
Pulsar Instance
Plusieurs clusters Apache Pulsar peuvent être regroupés pour former une seule instance Pulsar géo-répliquée.
Cela permet de facilement répliquer les messages entre différents data-centers localisés dans des régions distantes.
Multi-Tenant
Apache Pulsar expose également un concept de tenant et de namespace pour faciliter l’administration de clusters multi-tenant. Chaque tenant peut avoir ses propres quotas et ses propres configurations pour gérer l’authentification et les autorisations. Un tenant peut également être distribué sur plusieurs clusters via les instances Pulsar.
Un namespace est l’unité de base pour la configuration des topics. Plusieurs namspaces peuvent être créés dans un tenant et plusieurs topics peuvent être créés dans un namespace. De plus, la configuration définie au niveau d’un namespace s’applique à l’ensemble des topics qui le compose.
Tiered-Storage
Parce que les données peuvent croître indéfiniment, il est souvent intéressant de transférer les données les plus anciennes vers des systèmes de stockage moins coûteux. Une façon d’y parvenir est d’utiliser ce que nous appelons un stockage à plusieurs niveaux (tiered-storage).
Au moment de la rédaction de cet article, Apache Pulsar supporte nativement AWS S3 et Google Cloud Storage pour le stockage à long terme.
Apache Pulsar expose une API REST simple pour déclencher le transfert de données.
Démarrer avec Apache Pulsar
Maintenant que nous avons une meilleure compréhension de ce qu’est Apache Pulsar et de son fonctionnement, amusons-nous un peu avec.
1 ) Tout d’abord, nous allons installer et déployer un cluster autonome sur notre machine.
$ wget [https://archive.apache.org/dist/pulsar/pulsar-2.4.1/apache-pulsar-2.4.1-bin.tar.gz](https://archive.apache.org/dist/pulsar/pulsar-2.4.1/apache-pulsar-2.4.1-bin.tar.gz)
$ tar -xzvf [apache-pulsar-2.4.1-bin.tar.gz](https://archive.apache.org/dist/pulsar/pulsar-2.4.1/apache-pulsar-2.4.1-bin.tar.gz) && cd apache-pulsar-2.4.1
$ ./bin/pulsar standalone
2 ) Ensuite, ouvrons un nouveau terminal et lançons un nouveau consumer en utilisant pulsar-client
.
$ ./bin/pulsar-client consume -s "my-first-subscription" my-first-topic -n 10
La commande ci-dessus crée un consumer exclusif qui attend 10 messages avant de s’arrêter.
3) Enfin, dans un autre terminal, nous allons produire quelques messages.
$> bin/pulsar-client produce my-first-topic — messages "Hello Streams World, Make sense of streams processing"
Finalement, si vous retournez sur le terminal du consumer, vous devriez obtenir un résultat comme celui-ci :
[pulsar-client-io-1–1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum — Failed to load Circe JNI library. Falling back to Java based CRC32c provider
— — — got message — — -
**Hello Streams World**
— — — got message — — -
**Make sense of streams processing**
[pulsar-timer-4–1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl — [my-first-topic] [my-first-subscription] [e81d3] Prefetched messages: 0 — — Consume throughput received: 0,03 msgs/s — — 0,00 Mbit/s — — Ack sent rate: 0,03 ack/s — — Failed messages: 0 — — Failed acks: 0
** Les commandes de bases de pulsar-admin
**
Apache Pulsar fournit un outil, relativement riche, en ligne de commande bin/pulsar-admin
. Celui-ci vous permet, entre autre, d’obtenir de nombreuses informations sur l’état du cluster, les topics, les subscriptions, etc.
Examinons quelques commandes disponibles:
- Lister les cluster disponibles
$ ./bin/pulsar-admin clusters list
standalone
- Obtenir des informations sur un cluster
$ ./bin/pulsar-admin clusters get standalone
{
"serviceUrl" : "[http://localhost:8080](http://oroborus:8080)",
"brokerServiceUrl" : "pulsar://locahost:6650"
}
- Lister tous les topics créés pour un tenant/namespace :
Par défaut, un topic est créé comme un topic unique et persistant sous un tenant “public” et un namespace “default”. Vous pouvez lister tous les topics créés en utilisant la commande suivante :
$ ./bin/pulsar-admin topics list public/default
persistent://public/default/my-first-topic
- Créer un nouveau topic partionné :
$ ./bin/pulsar-admin topics create-partitioned-topic --partitions 3 my-partitioned-topic
- Lister les topics partitionnés :
*./bin/pulsar-admin topics list-partitioned-topics public/default
- Lister toutes les subscriptions pour un topic :
$ ./bin/pulsar-admin topics subscriptions persistent://public/default/my-first-topic
- Obtenir des statistiques sur un topic :
$ ./bin/pulsar-admin topics stats persistent://public/default/my-first-topic
Ce n’était qu’un aperçu des commandes existantes. Pour en savoir plus sur les commandes disponibles, je vous recommande vivement de lire la documentation officielle.
Java Clients
Dans la partie précédente, nous avons produit/consommé certains messages en utilisant pulsar-client
. Apache Pulsar fournit également des API clients pour Java, Go et C++ afin de développer des tâches de production, de consommation et d’administration.
Créons un simple projet Maven et ajoutons la dépendance du client Apache Pulsar :
<dependencies>
<dependency>
<groupId> org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
Ecrire un premier Producer
Tout d’abord, avant d’instancier un Producer
ou un Consumer
, il est nécessaire de créer une instance PulsarClient
:
PulsarClient client = PulsarClient.*builder*()
.serviceUrl("pulsar://localhost:6650")
.build();
Ensuite, un nouveau Producer
peut être instancié à partir du client précédemment créé. Notez qu’un Producer
est attaché à un topic.
Producer<byte[]> producer = client.newProducer()
.topic("my-first-topic")
.create();
Par défaut, un Producer
s’attend à ce que les valeurs soient envoyées sous forme de tableaux de bytes. Mais vous pouvez également spécifier un Schema
pour produire des messages sous différents types.
Producer<String> producer = client.newProducer(Schema.*STRING*)
.topic("my-first-topic")
.create();
Nous pouvons ensuite commencer à produire quelques messages. La méthode d’envoi send
est ici bloquante tant que nous n’avons pas reçu d’acquittement depuis le broker cible.
producer.send("Hello Streams Word!");
Les messages peuvent également être envoyés de manière asynchrone en utilisant la méthode sendAsync
:
CompletableFuture<MessageId> future = producer.sendAsync("Make sense of streams processing");
future.thenAccept(msgId -> {
System.*out*.printf("Message with ID %s successfully sent asynchronously\n", msgId);
});
Dans l’exemple précédent, nous avons envoyé des messages en passant une simple valeur aux méthodes send
/sendAsync
. Mais, il est également possible de construire des messages avec une clé et des propriétés données :
TypedMessageBuilder<String> message = producer.newMessage()
.key("my-key")
.value("value-message")
.property("application", "pulsar-java-quickstart")
.property("pulsar.client.version", "2.4.1");
message.send();
De plus, pour des raisons de performances, il est généralement judicieux d’envoyer les messages sous forme de batchs afin d'économiser, en fonction de votre débit, une partie de la bande passante réseau. Le traitement par batch peut être activé lors de la création du client producteur.
Producer<String> producer = client.newProducer(Schema.*STRING*)
.topic("my-first-topic")
.compressionType(CompressionType.*SNAPPY*)
.enableBatching(true)
.batchingMaxPublishDelay(100, TimeUnit.*MILLISECONDS*)
.batchingMaxMessages(1000)
.create();
Notez que lorsque vous activez l’envoie par batchs, il est également recommandé d’activer la compression.
Enfin, vous ne devez jamais oublier de fermer à la fois le client et le producer.
producer.close();
client.close();
Ecrire un premier Consumer
Dans l’exemple ci-dessous, nous allons créer un consommateur exclusif. Cela signifie que seul le premier consommateur (pour l’abonnement configuré) sera assigné aux topic-partitions. Les autres consommateurs qui tenteront d’utiliser l’abonnement recevront une erreur.
La création d’une nouvelle instance Consumer
se fait simplement en utilisant notre instance PulsarClient
.
Consumer<String> consumer = client.newConsumer(Schema.*STRING*)
.topic("my-first-topic")
.subscriptionName("my-first-subscription")
.subscriptionType(SubscriptionType.*Exclusive*)
.subscribe();
Ensuite, vous pouvez invoquer la méthode receive()
dans une boucle while
pour consommer tous les messages produits dans le topic auquel nous avons souscrit.
while (true) {
// *blocks until a message is available*
Message<String> msg = consumer.receive();
try {
System.*out*.printf("Message received: %s", msg);
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
Pulsar Functions
Les Functions
d’Apache Pulsar sont des processus légers qui peuvent être soumis à un cluster Apache Pulsar pour effectuer une opération dite : consume-transform-produce.
Une Function
consomme les messages d’un ou plusieurs topics d’entrée, applique une fonction sur chaque message et produit ensuite un résultat dans un ou plusieurs topics de sortie.
Voici un exemple simple de Function
:
public class SplitSentenceIntoWords implements Function<String, Void> {
@Override
public Void process(String input, Context context)
throws Exception
{
String[] words = input.split(" ");
for (String word : words) {
context.newOutputMessage("split-words-topic", Schema.*STRING*)
.value(word)
.send();
}
return null;
}
}
Notez qu’une Function
peut parfaitement effectuer des opérations dites stateful
.
Une Function
est exécutée par des composantes appelées “function-workers” qui peuvent être exécutées directement par les brokers du cluster ou par des brokers dédiés.
Enfin, il est possible de développer des Function
s en Java, Python et Go.
Pulsar IO
Pulsar IO est une fonctionnalité intégrée qui permet d’intégrer un cluster Apache Pulsar à des systèmes externes tels que des bases de données ou d’autres technologies de messaging, via l’utilisation de connecteurs.
Pulsar IO définit deux types de connecteurs :
Source : Les connecteurs de type source capturent les données d’un système externe et les écrivent dans les topics Pulsar.
Sink : Les connecteurs de type sink consomment des données depuis des topics Pulsar et les écrivent dans un système externe.
Sous le capot, Pulsar IO s’appuie sur Pulsar Functions pour la mise en œuvre et la gestion des connecteurs.
Apache Pulsar fournit déjà des connecteurs pour Cassandra, Aerospike, RabbitMQ, etc.
Web UI
Pour les développeurs qui commencent avec Apache Pulsar, je recommande également le projet pulsar-express, développé par Bruno Bonnin(@bruno_b), qui fournit une interface Web simple pour explorer les topics, les souscriptions et les consumers, etc.
Pour démarrer avec pulsar-express, vous pouvez utiliser l’image Docker fournie comme suit :
$ docker run -it -p 3000:3000 --network=host bbonnin/pulsar-express
Vous devez ensuite créer une nouvelle connexion pour votre cluster: http://localhost:3000/connections
Enfin, vous pouvez explorer les détails du topic créé sur votre cluster :
Conclusion
Apache Pulsar est une plateforme de streaming bien conçue qui offre des fonctionnalités intégrées adaptées à l’entreprise telles que: la géo-replication, la gestion du multi-tenant et le tiered-stockage.
Enfin, Pulsar Functions et Pulsar IO vous fournissent tous les outils nécessaires à la mise en œuvre d’applications complexes de traitement de flux et de pipelines d’intégration de données.