Cet article est disponible en anglais sur : Medium
Dans un précédent article, j’ai présenté le fonctionnement du protocole de “Rebalance” d’Apache Kafka et la manière dont il est utilisé en interne. Du point de vue des consumers Kafka, ce protocole est utilisé à la fois pour coordonner les membres d’un même groupe et pour répartir entre eux l’assignation des partitions.
L’un des aspects clés de ce protocole est que, en tant que développeur, nous pouvons intégrer notre propre protocole pour personnaliser la manière dont les partitions sont attribuées aux membres du groupe.
Dans cet article, nous verrons quelles stratégies peuvent être configurées pour un Kafka Client Consumer et comment écrire un PartitionAssignor
personnalisé mettant en œuvre une stratégie de failover.
Les stratégies d’assignation (PartitionAssignor
)
Lors de la création d’un nouveau consumer, nous pouvons configurer la stratégie qui sera utilisée pour l’attribution des partitions entre les membres du groupes.
La stratégie d’assignation est configurable via la propriété partition.assignment.strategy
.
L’extrait de code suivant illustre la manière de spécifier un Assignor
spécifique :
Properties props = new Properties();
...
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//...
Tous les consommateurs qui appartiennent à un même groupe doivent avoir une stratégie commune déclarée. Si un consommateur tente de rejoindre un groupe ayant une configuration incompatible avec les autres membres du groupe, vous vous retrouverez avec cette exception :
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member’s supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
Cette propriété accepte une liste de stratégies séparées par des virgules. Par exemple, elle permet de mettre à jour un groupe de consommateurs en spécifiant une nouvelle stratégie tout en conservant temporairement la précédente. Dans le cadre du protocole de Rebalance, le broker coordinator choisira le protocole qui est supporté par tous les membres.
Une stratégie représente simplement le nom d’une classe implémentant l’interface PartitionAssignor
.
Kafka Clients propose trois stratégies intégrées: RangeAssignor, RoundRobinAssignor and StickyAssignor.
RangeAssignor
La classe RangeAssignor
est la stratégie par défaut. Cette stratégie permet de co-localiser les partitions de plusieurs topics. Cela est utile, par exemple, pour joindre des messages de deux topics, ayant le même nombre de partitions et la même logique de partitionnement des clés.
Pour cela, la stratégie commence par mettre tous les consommateurs dans l’ordre lexicographique en utilisant le member_id attribué par le broker coordinator. Ensuite, elle ordonne également les topic-partitions disponibles. Enfin, pour chaque topics, les partitions sont attribuées à partir du premier consumer.
Comme vous pouvez le voir, les partitions 0 des topics A et B sont attribuées à la même instance.
Dans l’exemple, ci-dessus, nous utilisons au maximum deux consumers, car nous n’avons pas plus de deux partitions par topic. Si vous prévoyez de consommer plusieurs topics d’entrée et que vous n’effectuez pas une opération de colocalisation des partitions, il est préférable de ne pas utiliser la stratégie par défaut.
RoundRobinAssignor
La classe RoundRobinAssignor
peut être utilisée pour répartir les partitions disponibles de manière égale entre tous les membres. Comme précédemment, la stratégie classera les partitions et les consommateurs dans l’ordre lexicographique avant d’attribuer chaque partition.
Même si le RoundRobinAssignor
présente l’avantage de maximiser le nombre de consommateurs utilisés, il présente un inconvénient majeur. En effet, il ne cherche pas à réduire les mouvements de partition lorsque le nombre de consommateurs change (c’est-à-dire lorsqu’un Rebalance se produit).
Pour illustrer ce comportement, retirons le consommateur 2 du groupe. Dans ce scénario, la partition B-1 est révoquée de C1 pour être réaffectée à C3. Inversement, le thème B-0 est retiré de C3 pour être réaffecté à C1.
Dans l’hypothèse où le consommateur initialise des caches internes, ouvre des ressources ou des connexions pendant l’affectation d’une partition, ce mouvement de partition inutile peut avoir un impact sur les performances de ce dernier.
StickyAssignor
La classe StickyAssignor
est assez similaire au RoundRobin, sauf qu’il tente de minimiser les mouvements de partitions entre deux Rebalances, tout en assurant une distribution uniforme.
Ainsi, dans l’exemple précédent, si le consommateur C2 quitte le groupe, seule l’affectation de la partition A-1 passe à C3.
StreamsPartitionAssignor
Kafka Streams possède sa propre stratégie d’assignation. Il est utilisé pour assigner des partitions entre les instances d’application tout en assurant leur co-localisation et en maintenant les états des tâches actives et passives.
Généralement, les trois stratégies de base conviennent à la plupart des cas d’utilisation. Cependant, il se peut que vous ayez un contexte de projet spécifique ou une politique de déploiement qui vous oblige à mettre en œuvre votre propre stratégie.
Pour cela, voyons comment implémenter l’interface org.apache.kafka.clients.consumer.internals.PartitionAssignor
Implémenter une stratégie personnalisée
L’interface PartitionAssignor
L’interface PartitionAssignor n’est pas très complexe et ne contient que quatre méthodes.
public interface PartitionAssignor {
Subscription subscription(Set<String> topics);
Map<String, Assignment> assign(
Cluster metadata,
Map<String, Subscription> subscriptions);
void onAssignment(Assignment assignment);
String name();
}
Tout d’abord, la méthode subscription()
est invoquée sur l’ensemble des consommateurs, qui sont alors responsables de créer un objet Subscription
qui sera envoyé au broker coordinator. Une Subscription contient l’ensemble des topics auxquels le consommateur s’abonne et, éventuellement, certaines données utilisateurs qui peuvent être utilisées par l’algorithme d’assignation.
Ensuite, dans le cadre de l’exécution du protocole de Rebalance, le consommateur group leader recevra les Subscrition
de tous les consommateurs et sera chargé d’effectuer l’assignation des partitions via la méthode assign()
.
Par la suite, tous les consommateurs recevront leur Assignment
calculé par le group leader et la méthode onAssignment()
sera invoquée sur chacun d’eux. Cette méthode peut être utilisée par les consommateurs pour maintenir un état interne.
Enfin, un PartitionAssignor
doit être assigné à un nom unique qui est renvoyé par la méthode name()
(par exemple “range” ou “roundrobin” ou “sticky").
Failover strategy
Avec les stratégies par défaut, tous les consommateurs d’un groupe peuvent être assignés à des partitions. Nous pouvons comparer cette stratégie à un modèle actif/actif. Cela signifie que toutes les instances sont susceptibles de consommer des messages en même temps. Mais, pour certains scénarios de production, il peut être nécessaire d’effectuer une consommation active/passive. Je vous propose donc de mettre en place un FailoverAssignor qui est, en fait, une stratégie que l’on peut trouver dans d’autres solutions de messaging.
L’idée de base de la stratégie de Failover est que plusieurs consommateurs peuvent rejoindre un même groupe. Cependant, toutes les partitions sont attribuées à un seul consommateur à la fois. Si ce consommateur échoue ou est arrêté, toutes les partitions sont alors attribuées au consommateur disponible suivant. En général, les partitions sont attribuées au premier consommateur mais, dans notre exemple, nous allons attribuer une priorité à chacune de nos instances. Ainsi, l’instance ayant la priorité la plus élevée sera préférée aux autres.
Illustrons cette stratégie. Dans l’exemple ci-dessous, C1 a la plus haute priorité, toutes les partitions lui sont donc attribuées.
En cas de défaillance du consommateur, toutes les partitions sont alors attribuées au consommateur suivant (c’est-à-dire C2).
Implémentation
Tout d’abord, créons une nouvelle classe Java appelée FailoverAssignor
. Au lieu d’implémenter directement l’interface PartitionAssignor
, nous allons étendre la classe abstraite AbstractPartitionAssignor
. Celle-ci implémente pour nous la méthode assign(Cluster, Map<String, Subscription>)
et réalise toute la logique pour obtenir les partitions disponibles pour chaque abonnement. Elle déclare également la méthode abstraite suivante que nous allons implémenter :
Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions);
Mais avant cela, nous devons rendre notre FailoverAssignor
configurable, afin de pouvoir attribuer une priorité à chaque consommateur. Heureusement, Kafka fournit pour cela l’interface Configurable
que nous pouvons implémenter pour récupérer la configuration de l’instance KafkaConsumer
.
Le code complet jusqu’ici est :
public class FailoverAssignor extends AbstractPartitionAssignor implements Configurable {
@Override
public String name() {
return "failover";
}
@Override
public void configure(final Map<String, ?> configs) {
// TODO
}
@Override
public Subscription subscription(final Set<String> topics) {
// TODO
}
@Override
Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// TODO
}
}
Dans le code ci-dessus, la méthode configure()
est invoquée juste après l’initialisation de l’instance FailoverAssignor
par l’instance KafkaConsumer
.
Afin de suivre la convention de code Kafka, nous allons créer une seconde classe appelée FailoverAssignorConfig
qui étendra la classe AbstractConfig
de l’API Client Kafka.
public class FailoverAssignorConfig extends AbstractConfig {
public static final String *CONSUMER_PRIORITY_CONFIG *= "assignment.consumer.priority";
public static final String *CONSUMER_PRIORITY_DOC *= "The priority attached to the consumer that must be used for assigning partition. " +
"Available partitions for subscribed topics are assigned to the consumer with the highest priority within the group.";
private static final ConfigDef *CONFIG*;
static {
*CONFIG *= new ConfigDef()
.define(*CONSUMER_PRIORITY_CONFIG*, ConfigDef.Type.*INT*, Integer.*MAX_VALUE*,
ConfigDef.Importance.*HIGH*, *CONSUMER_PRIORITY_DOC*);
}
public FailoverAssignorConfig(final Map<?, ?> originals) {
super(*CONFIG*, originals);
}
public int priority() {
return getInt(*CONSUMER_PRIORITY_CONFIG*);
}
}
Maintenant, la méthode configure()
peut-être simplement implémentée de la façon suivante :
public void configure(final Map<String, ?> configs) {
this.config = new FailoverAssignorConfig(configs);
}
Ensuite, nous avons besoin d’implémenter la méthode subscription()
afin de partager la priorité de chaque consommateur à travers le champ user-data.
Remarque : Les user-data doivent être encapsulés dans un ByteBuffer
.
@Override
public Subscription subscription(final Set<String> topics) {
ByteBuffer userData = ByteBuffer.*allocate*(4)
.putInt(config.priority())
.flip();
return new Subscription(
new ArrayList<>(topics),
ByteBuffer.*wrap*(userData)
);
}
Puis, nous pouvons implémenter la méthode assign()
:
@Override
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// Generate all topic-partitions using the number
/ of partitions for each subscribed topic.
final List<TopicPartition> assignments = partitionsPerTopic
.entrySet()
.stream()
.flatMap(entry -> {
final String topic = entry.getKey();
final int numPartitions = entry.getValue();
return IntStream.*range*(0, numPartitions)
.mapToObj( i -> new TopicPartition(topic, i));
}).collect(Collectors.*toList*());
// Decode consumer priority from each subscription and
Stream<ConsumerPriority> consumerOrdered = subscriptions.entrySet()
.stream()
.map(e -> {
int priority = e.getValue().userData().getInt();
String memberId = e.getKey();
return new ConsumerPriority(memberId, priority);
})
.sorted(Comparator.*reverseOrder*());
// Select the consumer with the highest priority
ConsumerPriority priority = consumerOrdered.findFirst().get();
final Map<String, List<TopicPartition>> assign = new HashMap<>();
subscriptions.keySet().forEach(memberId -> assign.put(memberId, Collections.*emptyList*()));
assign.put(priority.memberId, assignments);
return assign;
}
Enfin, nous pouvons utiliser notre classe comme ceci :
Properties props = new Properties();
...
props.put(
ConsumerConfig.*PARTITION_ASSIGNMENT_STRATEGY_CONFIG*,
FailoverAssignor.class.getName()
);
props.put(FailoverAssignorConfig.*CONSUMER_PRIORITY_CONFIG*, "10");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Conclusion
L’API Kafka Clients vous permet d’implémenter vos propres stratégies pour assigner les partitions aux consommateurs. Cela peut être très utile pour s’adapter à des scénarios de déploiement spécifiques, comme l’exemple dit de failover que nous avons utilisé dans ce post. En outre, la capacité de transmettre les données utilisateurs au consommateur group leader lors d’un Rebalance peut être mise à profit pour mettre en œuvre des algorithmes plus complexes, comme celui développé pour Kafka Streams.
Vous pouvez trouver le code source complet sur GitHub.