Apache Kafka Rebalance Protocol: La magie qui se cache derrière votre application de streaming

Apache Kafka Rebalance Protocol: La magie qui se cache derrière votre application de streaming
The Author's Avatar

Cet article est disponible en anglais sur : Medium

Depuis Apache Kafka 2.3.0, le protocole interne de Rebalancing, qui est particulièrement utilisé par Kafka Connect et l’API Client Consumer, a subi plusieurs changements majeurs.

Le protocole de Rebalancing n’est pas quelque chose de simple et peut parfois ressembler à de la magie. Dans cet article, je propose de revenir sur les fondements de ce protocole, qui est au cœur du mécanisme de consommation Apache Kafka. Ensuite, nous parlerons de ses limites et des améliorations actuelles.

Apache Kafka & The Rebalance Protocol 101

Revenons aux bases

Apache Kafka est une plateforme distribuée de streaming d’événements basée sur un modèle de publish/subscribe. Tout d’abord, des applications appelées producers envoient des messages dans des topics, qui sont gérés et stockés par un cluster de brokers. Ensuite, des applications appelées consumers s’abonnent à ces topics pour aller consommer et traiter les messages publiés.

Un topic est réparti sur plusieurs brokers, de sorte que chaque broker gère des sous-ensembles de messages pour chaque topic - ces sous-ensembles sont appelés partitions. Le nombre de partitions est défini lors de la création d’un topic et peut être augmenté au fil du temps (mais attention à cette opération).

Ce qu’il faut comprendre, c’est qu’une partition représente l'unité de parallélisation aussi bien pour les producers que pour les consumers Kafka.

Du côté des producers, les partitions permettent d’écrire des messages en parallèle. Si un message est publié avec une clé, alors, par défaut, le producer calculera un hash de la clé donnée pour déterminer la partition de destination (c.à.d: hash(key) % num_partitons). Cela permet de garantir que tous les messages ayant la même clé seront envoyés vers la même partition. D’un point de vue consumer, cela garantit également l’ordre de consommation des messages pour cette partition.

Du côté des consumers, le nombre de partitions pour un topic limite le nombre maximum de consumers actifs au sein d’un consumer group. Un consumer group est le mécanisme fournit par Kafka pour regrouper plusieurs clients consumers, en un groupe logique, afin de distribuer la consommation des partitions. Kafka garantit qu’une partition est attribuée à un seul consumer au sein d’un groupe.

Par exemple, l’illustration ci-dessous représente un consumer group nommé A qui est composé de trois consumers. Les consumers sont abonnés au topic A et l’affectation des partitions est : P0 à C1, P1 à C2, P2 à C3 et P1.

Apache Kafka — Consumer Group

Si un consumer quitte le groupe après un arrêt contrôlé ou un crash, toutes ses partitions seront automatiquement réassignées parmi les autres consumers. De la même manière, si un consumer (ré)intègre un groupe existant, toutes les partitions seront également redistribuées entre l’ensemble des membres du groupe.

La capacité des clients consumers à coopérer au sein d’un groupe dynamique est possible grâce à l’utilisation du protocole appelé: Kafka Rebalance Protocol., on parle aussi de rééquilibrage.

Plongeons en profondeur dans ce protocole pour comprendre son fonctionnement.

Le “Rebalance Protocol”, en bref et en images

Tout d’abord, donnons une définition de la signification du terme “rebalance” dans le contexte Apache Kafka:

Rebalance/Rebalancing: the procedure that is followed by a number of distributed processes that use Kafka clients and/or the Kafka coordinator to form a common group and distribute a set of resources among the members of the group (source : Incremental Cooperative Rebalancing: Support and Policies).

La définition, ci-dessus, ne fait en réalité aucune référence à une notion de consumers ou de partitions. Elle utilise plutôt un concept de membres et de ressources. La raison principale est que le protocole de rebalance ne se limite pas à la gestion des consumers, mais peut également être utilisé pour coordonner n’importe quel groupe de processus.

Voici quelques usages du protocole au sein de la plateforme Kafka :

  • Confluent Schema Registry s’appuie sur le rebalance protocol pour élire un nœud leader.

  • Kafka Connect l’utilise pour répartir les tâches et les connecteurs entre les workers.

  • Kafka Streams l’utilise pour attribuer des tâches et des partitions aux instances d’une application KafkaStreams.

Apache Kafka Rebalance Protocol and components

Enfin, il est important de comprendre que ce mécanisme de Rebalancing s’articule autour de deux protocoles : Group Membership Protocol et Client Embedded Protocol.

Le Group Membership Protocol, comme son nom l’indique, est chargé de la coordination des membres au sein d’un groupe. Les clients participant à un groupe exécuteront une séquence de requêtes/réponses avec un broker Kafka qui agira en tant que coordinator.

Le second protocole est exécuté du côté du client et permet d’étendre le premier en s’y intégrant. Par exemple, le protocole utilisé par les consumers permet d’assigner les partitions d’un ou plusieurs topics aux membres.

Maintenant que nous avons une meilleure compréhension de ce qu’est le protocole de rebalancing, illustrons sa mise en œuvre avec l’assignation des partitions au sein d’un consumer group.

JoinGroup

Lorsqu’un consumer démarre, il envoie une première requête de type FindCoordinator pour obtenir l’adresse du broker Kafka qui est responsable du groupe à rejoindre. A la suite de cela, le protocole de rebalancing peut commencer via l’envoie d’une requête de type JoinGroup à destination du coordinator.

Consumer — Rebalance Protocol — SyncGroup Request

Comme nous pouvons le voir, la requête JoinGroup contient certaines configurations des clients consumers telles que le session.timeout.ms et le max.poll.interval.ms. Ces propriétés sont utilisées par le coordinator pour exclure du groupe les membres qui ne répondraient plus.

La requête contient également deux champs très importants : la liste des protocoles clients, qui sont pris en charge par les membres, et les métadonnées qui seront utilisées pour exécuter l’un des protocoles clients intégrés. Dans notre cas, les protocoles-clients correspondent à la liste des stratégies d’assignation des partitions (assignors) configurés au niveau du consommateur (cf: partition.assignment.strategy). Enfin, les métadonnées contiennent la liste des topics auxquels le consommateur s’est abonné.

Notez que si vous ne savez pas à quoi servent ces propriétés, je vous invite à lire la documentation officielle.

La requête de JoinGroup agit comme une barrière, ce qui signifie que le coordinateur n’envoie pas de réponses tant que toutes les demandes des consommateurs ne sont pas reçues (c.f: group.initial.rebalance.delay.m) ou que le timeout de rebalancing n’est pas atteint.

Consumer — Rebalance Protocol — JoinGroup Response

Le premier consumer, au sein du groupe, reçoit la liste des membres actifs ainsi que la stratégie d’assignation choisie. Il agit alors en tant que group leader tandis que les autres reçoivent une réponse vide. Le group leader est alors responsable de l’exécution locale de la stratégie d’assignation des partitions.

SyncGroup

Ensuite, tous les membres envoient une requête de type SyncGroup au coordinator. Le group leader y attache les assignations calculées tandis que les autres répondent simplement par une requête vide.

Consumer — Rebalance Protocol — SyncGroup Request

Une fois que le coordinator a répondu à toutes les requêtes de synchronisation, chaque consumer reçoit ses partitions assignées, invoque la méthode onPartitionsAssigned() sur le ConsumerRebalanceListener configuré, puis commence à récupérer des messages.

Consumer — Rebalance Protocol — SyncGroup Response

Heartbeat

Pour finir, chaque consumer envoie périodiquement une requête de Heartbeat au broker coordinator afin de maintenir sa session en vie (c.f: heartbeat.interval.ms).

Si un rebalance est déclenché, le coordinator utilise la réponse de la requête de Heartbeat pour indiquer aux autres consumers qu’ils doivent rejoindre à nouveau le groupe.

Consumer — Rebalance Protocol — Heartbeat

Jusqu’à présent, tout va bien, mais comme vous le savez surement, dans la vraie vie et plus particulièrement dans un système distribué, des défaillances se produiront inévitablement. Le matériel peut tomber en panne et le réseau ou un consumer peuvent connaître des défaillances temporaires. Malheureusement, pour toutes ces situations, un rebalance peut également être déclenché.

Quelques limitations

La première limite du protocole de Rebalancing est que nous ne pouvons pas simplement rééquilibrer un membre sans arrêter l’ensemble du groupe (effet stop-the-world).

Prenons l’exemple, d’arrêter proprement une de nos instances. Dans ce premier scénario de rebalancing, le consumer enverra une requête de type LeaveGroup au broker coordinator, avant de s’arrêter.

Consumer — Rebalance Protocol — LeaveGroup

Les consumers restants seront informés qu’un rebalancing doit être effectué lors du prochain Heartbeat et lanceront une nouvelle séquence de requêtes/réponses JoinGroup/SyncGroup afin de se voir réassigner les partitions.

Consumer — Rebalance Protocol — Rejoin

Pendant tout le processus de rebalancing, c’est-à-dire tant que les partitions ne sont pas réattribuées, les consumers ne traitent plus aucune donnée. Par défaut, le délai de rebalancing est fixé à 5 minutes, ce qui peut constituer une très longue période pendant laquelle le retard (lag) croissant du consommateur peut devenir un problème.

Mais que se passerait-il si, par exemple, le consumer ne faisait que de redémarrer après une erreur passagère ? Eh bien, le consumer, en rejoignant le groupe, déclencherait un nouveau rebalance qui ferait que tous les consumers seraient arrêtés (une fois de plus).

Consumer — Rebalance Protocol — Restart

Une autre raison qui peut conduire au redémarrage d’un consumer est un rolling-upgrade du groupe. Ce scénario est malheureusement désastreux pour le consumer group. En effet, avec un groupe de trois consumers, une telle opération déclencherait 6 rebalancing qui pourraient potentiellement avoir un impact important sur le traitement des messages.

Enfin, un problème courant lors de l’exécution des consumers Kafka, en Java, est soit de manquer une requête de Heartbeart, en raison d’une panne réseau ou d’une pause GC trop longue, soit de ne pas invoquer la méthode KafkaConsumer#poll(), périodiquement, en raison d’un temps de traitement excessif. Dans le premier cas, le coordinator ne reçoit pas de Heartbeat pendant plus de session.timeout.ms millisecondes et considère le consumer comme mort. Dans le second cas, le temps nécessaire au traitement des records est supérieur à max.poll.inteval.ms.

Consumer — Rebalance Protocol — Timeout

Static Membership

Pour réduire le nombre de rebalance causé par des défaillances temporaires des consumers, Apache Kafka 2.3 introduit le concept de Static Membership avec la KIP-345.

L’idée principale derrière le Static Membership est que chaque instance consumer est attachée à un identifiant unique configuré via la propriété group.instance.id. Le protocole de Rebalancing a été étendu de sorte que les ids sont propagés au broker coordinator via la requête JoinGroup.

Si un consumer est redémarré ou kill en raison d’une défaillance temporaire, le broker coordinator n’informera pas les autres consumers du même groupe qu’un rebalancing est nécessaire avant que session.timeout.ms ne soit atteint. La raison à cela est que les consumers n’enverront pas de requête de LeaveGroup lorsqu’ils seront arrêtés.

Lorsque le consumer rejoindra enfin le groupe, le broker coordinator lui retournera ses assignations mises en cache, sans procéder à aucun rebalancing.

Lors de l’utilisation du Static Membership, il est recommandé d’augmenter suffisamment la propriété session.timeout.ms des consumers pour que le broker coordinator ne déclenche pas trop fréquemment le rebalance.

D’une part, le Static Membership peut être très utile pour limiter le nombre de rebalance indésirables et ainsi minimiser l’effet “stop-the-world”. D’autre part, cela présente l’inconvénient d’augmenter l’indisponibilité des partitions, car le broker coordinator ne pourra détecter un consumer défaillant qu’après quelques minutes (en fonction du session.timeout.ms). Malheureusement, il s’agit de l’éternel compromis entre disponibilité et tolérance aux pannes qu’il faut avoir dans un système distribué.

Incremental Cooperative Rebalancing

À partir de la version 2.3, Apache Kafka introduit également de nouveaux protocoles embarqués pour améliorer la disponibilité des ressources de chaque membre tout en minimisant l’effet “stop-the-world”.

L’idée de base de ces nouveaux protocoles est d’effectuer un rebalancing progressivement et en coopération. En d’autres termes, il s’agit d’exécuter plusieurs cycles de rebalance plutôt qu’un seul à l’échelle du groupe.

Le protocole Incremental Cooperative Rebalancing a, dans un premier temps, été implémenté pour Kafka Connect par le biais de la KIP-415 (partiellement mis en œuvre dans Kafka 2.3). Pour KafkaStreams et l’API Consumer, il sera disponible uniquement à partir de Kafka 2.4 via la KIP-429.

Kafka Connect Limitations

Kafka Connect utilise le protocole Group Membership pour répartir les connecteurs et les tâches de manière homogène entre les workers qui composent un cluster Connect. Ainsi, les workers se coordonnent entre eux pour se distribuer les connecteurs et les tâches lorsqu’un nœud est défaillant ou est redémarré, lorsque le nombre de tâches augmente ou diminue et lorsqu’une configuration est mise à jour.

Toutefois, avant Kafka 2.3, chaque fois que l’un de ces scénarios se produisait, l’exécution de tous les connecteurs existants était interrompue (effet stop-the-world). Il était donc difficile de faire évoluer un cluster mutualisé avec plusieurs dizaines de connecteurs.

Le protocole Incremental Cooperative Rebalancing tente de résoudre ce problème de deux manières :

1 ) arrêter uniquement les tâches/membres qui sont concernés par les ressources à révoquées.

2 ) gérer les déséquilibres temporaires dans l’assignation des ressources entre les membres: soit immédiatement, soit de manière différée (utile lors des rolling-upgrade).

Pour cela, le protocole Incremental Cooperative Rebalancing se décline en trois implémentations concrètes :

  • Design I: Simple Cooperative Rebalancing

  • Design II: Deferred Resolution of Imbalance

  • Design III: Incremental Resolution of Imbalance

Pour vous permettre de mieux comprendre le fonctionnement de ce protocole, nous allons illustrer le Design II dans le contexte de Kafka Connect.

Deferred Resolution of Imbalance

Commençons par un cluster Kafka Connect simple composés de trois Worker avec cette assignation de tâches/connectors initiale :

1 — Initial assignment

Maintenant, imaginons que le W2 échoue sans raison particulière et quitte le groupe après un certain timeout. Un rebalance est déclenché et les workers W1 et W3 restants rejoignent de nouveau le groupe. Lors de l’envoi d’une requête de JoinGroup, les workers incluent leur assignation précédente. Les assignations sont partagées en utilisant le champ existant member_metadata du protocole Group Membership.

2 — W2 leaves the group and rebalance is triggered (W1, W3 join).

W1 est élu group leader et effectue les assignations des tasks/connectors en calculant la différence avec les affectations précédentes. Ici, le leader group détecte que certaines tâches et certains connecteurs ne sont pas présents dans les assignations précédentes.

3 — W1 becomes leader and computes assignments

W1 envoie les nouveaux tasks/connectors assignés ainsi que ceux qui ont été révoqués. Vous pouvez noter que W1 n’essaiera pas réellement de résoudre l’absence (ou le déséquilibre) d’affectation immédiate. Au lieu de cela, il reportera la résolution en planifiant un prochain rebalancing pour donner une chance au membre défaillant de réapparaître. Le délai de scheduling est fixé par une nouvelle configuration scheduled.rebalance.max.delay.ms (par défaut égale à 5 minutes).

Note: Avec le protocole Incremental Cooperative Rebalancing, lorsqu’un membre reçoit une nouvelle affectation, il commence par traiter toutes les nouvelles partitions (ou tasks/connectors). En outre, si l’assignation contient également des partitions révoquées, il arrête le traitement, commit et lance immédiatement une autre requête JoinGroup. Cela a pour effet d’augmenter le nombre de rebalancing, mais d’arrêter seulement les ressources dont l’affectation a changé.

4 — W1, W3 receive assignments

W2 rejoint le groupe avant que le délai n’expire et qu’un autre rebalance ne soit déclenché. W1 et W2 rejoignent également le groupe.

5 — B rejoins the group before delay expire and a rebalance is triggered

Toutefois, W1 ne réassignera pas les tasks/connectors manquants avant l’expiration du délai de rebalance prévu.

6 — W1 becomes leader and computes assignments

À l’expiration du délai restant, un rebalance final est déclenché et tous les workers rejoignent le groupe.

7 — W1, W2, W3 receive assignments

Enfin, le group leader réaffecte la task A-1 et le connector B à W2. Pendant toute la séquence de rééquilibrage, W1 et W3 n’ont jamais arrêté les tasks qui leur avaient été assignées.

8 — After delay, all members join

Conclusion

Le protocole de Rebalancing est une composante essentielle du mécanisme de consommation de Apache Kafka. Mais il sert également de protocole générique pour coordonner les membres d’un groupe et répartir des ressources entre eux (par exemple Kafka Connect). Le Static Membership et le Incremental Cooperative Rebalancing sont deux caractéristiques importantes qui apportent une amélioration considérable à Apache Kafka en rendant ce protocole plus robuste et plus évolutif.

Pour en savoir plus sur ce protocole et son fonctionnement, consultez les liens suivants.

Sources :

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.