Azkarra Streams v0.5: Les nouvelles fonctionnalités

Azkarra Streams v0.5: Les nouvelles fonctionnalités
The Author's Avatar

Nous avons le plaisir de vous annoncer la version 0.5.0 du projet AzkarraStreams.

Cette version comprend non seulement de nouvelles fonctionnalités importantes, mais aussi quelques changements au niveau des APIs publiques.

Lorsque nous avons démarré le projet Azkarra, il ne s’agissait que de quelques classes utilitaires utilisées pour encapsuler les classes KafkaStreams et Topology. Mais le projet a rapidement évolué pour devenir un framework plus élaboré.

Depuis sa sortie en open-source, en Novembre dernier, nous avons reçu les premiers retours de la communauté sur la façon dont le projet pouvait-être utilisé et sur les fonctionnalités à améliorer ou à ajouter. Nous avons donc considéré que, tant que le projet n’en était qu’à ses débuts, il était encore temps de remanier certaines APIs internes.

Azkarra Worker

Tout d’abord, cette nouvelle version d’AzkarraStreams ajoute un nouveau module appelé azkarra-worker qui permet de déployer rapidement une instance autonome d’Azkarra. AzkarraWorker s’inspire fortement des principes et des mécanismes de Kafka Connect. En effet, dans de nombreuses situations, vous pourriez souhaiter partager et réutiliser certaines topologies KafkaStreams au sein de votre organisation. Par exemple, il peut s’agir d’une topologie pour convertir des messages entre différents formats (JSON en Avro) ou encore pour calculer le nombre de messages dont l’un des champs est égal à une valeur spécifique, etc.

Composantes externes

Le mécanisme de componant-scan a été enrichi et peut maintenant charger des composants (ex: une classe implémentant l’interface TopologyProvider) depuis des répertoires externes au classpath Java. Pour cela, une nouvelle propriété azkarra.component.paths a été ajouté. Elle permet de configurer la liste des répertoires (séparés par une virgule) à partir desquels les composants seront scannés.

Azkarra suit la même logique que celle utilisée par KafkaConnect pour le chargement des connecteurs-plugins. Ainsi, chaque répertoire configuré peut contenir :

  • un uber JAR contenant toutes les classes et les dépendances tierces du composant.
  • un répertoire contenant tous les JARs pour le composant.

Classloading Isolation

De plus, pour chaque composant externe, Azkarra utilise également un ClassLoader Java dédié. Cela a pour but, d’une part, d’éviter les conflits de versions entre librairies et, d’autre part, de permettre l’exécution de plusieurs versions d’une même topologie KafkaStreams au sein d’un worker.

Image Docker

Enfin, une image Docker est également disponible sur Docker Hub pour faciliter l’usage de Azkarra Worker:

$> docker run --net host streamthoughts/azkarra-streams-worker
--mount type=bind,src=/tmp/azkarra/application.conf,dst=/etc/azkarra/azkarra.conf \
--mount type=bind,src=/tmp/components,dst=/usr/share/azkarra-components/ \
streamthoughts/azkarra-streams-worker

Notez que l’image Docker d’AzkarraStreams ne contient aucune topologie. Vous devez donc, comme dans l’exemple ci-dessus, monter le répertoire contenant les composants externes à charger.

Vous pouvez retrouver sur le projet GitHub, un exemple de configuration pour docker-compose.

StreamsLifecycleInterceptor

Une nouvelle interface appelée StreamsLifecycleInterceptor permet maintenant d’effectuer des opérations avant et après le démarrage ou l’arrêt d’une instance de KafkaStreams.

Azkarra fournit deux intercepteurs intégrés pour les opérations courantes :

  • AutoCreateTopicsInterceptor : Azkarra peut désormais être configuré pour créer automatiquement les topics source et sink déclarés par la topologie avant de lancer l’instance KafkaStreams. Cet intercepteur peut également être utilisé pour supprimer automatiquement les topics après l’arrêt de l’instance (uniquement pour le développement).

  • WaitForSourceTopicsInterceptor : Avant la version 0.5.0, Azkarra fournissait déjà un mécanisme permettant d’attendre que les topics sources aient été créés avant de lancer la topologie. Cette fonctionnalité s’appuie maintenant sur le concept d’intercepteurs.

Documentation : StreamsLifecycleInterceptor

KafkaStreamsFactory

Dans la version précédente, il n’était pas possible de fournir sa propre instance KafkaStreams pour les différentes topologies définies. Dans certains cas, cela pouvait-être une limitation importante notamment si vous souhaitiez fournir une instance KafkaClientSupplier spécifique (exemple: kafka-opentracing).

L’API Azkarra expose maintenant l’interface KafkaStreamsFactory qu’il est possible d’implémenter pour personnaliser la manière dont une instance KafkaStreams est créée.

L’exemple suivant montre comment configurer une factory :

StreamsExecutionEnvironment env = DefaultStreamsExecutionEnvironment.create();
env.setKafkaStreamsFactory(() -> new KafkaStreamsFactory() {
    @Override
    public KafkaStreams make(Topology topology, 
                             Conf streamsConfig) {
        KafkaClientSupplier clientSupplier = //...
        return new KafkaStreams(
          topology, 
          streamsConfig.getConfAsProperties(), clientSupplier);
    }
})

Documentation : KafkaStreamsFactory

Components Definition

Depuis plusieurs versions, Azkarra fournit un mécanisme relativement simple pour déclarer des composants et permettre l’injection de dépendance. Ce mécanisme a fait l’objet de nombreuses tâches de refactoring afin d’offrir davantage de fonctionnalités.

Component Factories

Azkarra v0.5.0 ajoute la nouvelle annotation Factory qui peut être utilisée pour annoter une classe qui fournit un ou plusieurs composants définis par des méthodes annotées avec Component.

Voici un exemple simple d’utilisation:

@Factory
public class TopicsFactory {

    @Component
    @Singleton
    public NewTopic streamsInputTopic() {
        return new NewTopic(
          "streams-plaintext-input", 6, (short)1);
    }

    @Component
    @Singleton
    public NewTopic streamsOuputTopic() {
        return new NewTopic(
           "streams-wordcount-output", 6, (short)1);
    }
}

JSR-330

Azkarra exploite désormais certaines des annotations Java définies par la spécification JSR-330 (javax.inject) — Dependency Injection for Java specification:

  • Singleton
  • Named

Cependant, vous devez noter qu’Azkarra ne supporte pas l’annotation Inject.

Restricted Qualifier

Cette version ajoute une nouvelle annotation @Restricted qui peut être utilisée pour limiter/qualifier le scope d’utilisation d’un composant. De cette manière, il est possible de restreindre l’usage d’un composant à un environnement ou à une topologie spécifique.

Dans l’exemple ci-dessous, la classe CustomStreamsFactory ne sera utilisée que par la topologie nommée wordCountTopology.

@Component
@Restricted(type = Restriction.TYPE_STREAMS, 
            names = "wordCountTopology"
)
public class CustomStreamsFactory implements KafkaStreamsFactory {

    @Override
    public KafkaStreams make(Topology topology, 
                             Conf streamsConfig) {
        return new KafkaStreams(
               topology, 
               streamsConfig.getConfAsProperties()
        );
    }
}

Autres améliorations et Bug fixés

Conclusion

Merci à toute la communauté open-source pour ses commentaires et ses encouragements.

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.