Kafka Streams est une librairie Java/Scala permettant de développer des applications, orientées microservices, de type stream-processing basées sur Apache Kafka.
Lors du développement d’une application KafkaStreams, les développeurs doivent non seulement définir leur topologie, c’est-à-dire le graphe d’opérations à appliquer aux messages consommés, mais aussi l’ensemble du code nécessaire à son exécution.
De plus, avant de pouvoir déployer une application KafkaStreams en production, il faudra savoir comment gérer les erreurs de traitement et les messages indésirables/corrompus puis comment surveiller et exploiter les instances. Enfin, si vous prévoyez d’exposer certains états internes, en utilisant la fonction intégrée de Kafka Streams appelée “Interactive Queries “, vous devrez également écrire le code nécessaire pour accéder à vos données (exemple : APIs REST).
Par conséquent, votre application peut rapidement devenir complexe avec du code passe-partout qui n’a pas (ou peu) de valeur métier directe mais que vous devrez maintenir et potentiellement dupliquer sur d’autres projets.
AzkarraStreams est un micro-framework open-source Java qui permet de développer et d’exploiter facilement les applications KafkaStreams (Azkarra est le mot basque pour “Rapide").
Principales caractéristiques
AzkarraStreams fournit un ensemble de fonctionnalités pour développer rapidement des applications KafkaStreams. Cela comprend, entre autres, les éléments suivants :
- Gestion du cycle de vie des instances KafkaStreams (plus besoin de
KafkaStreams#start()
). - Externalisation facile de la configuration (en utilisant Typesafe Config).
- Serveur HTTP embarqué pour l’interrogation des StateStores (Undertow).
- APIs REST pour exposer les métriques des instances KafkaStreams (JSON, Prometheus).
- WebUI pour la visualisation des topologies.
- **Authentification via SSL/TLS ou BasicAuth.
- Etc.
Pour commencer
Depuis la v0.5.0, il est possible d’utiliser l’image Docker (streamthoughts/azkarra-streams-worker) qui permet de démarrer un worker Azkarra standalone pour exécuter une ou plusieurs applications KafkaStreams.
Azkarra Worker suit le même mécanisme que celui utilisé par le projet Kafka Connect, c’est-à-dire que les topologies Kafka Streams sont fournies sous-forme de composants externes qui peuvent être démarrés et arrêtés soit par des appels REST, soit via la WebUI intégrée.
Pour commencer, démarrons une instance Azkarra Worker et un cluster Kafka mono-broker en utilisant le fichier docker-compose.yml
disponible sur le dépôt GitHub.
1 ) Exécutez la commande suivante pour télécharger et exécuter les conteneurs :
$ curl -s https://raw.githubusercontent.com/streamthoughts/azkarra-streams/master/docker-compose.yml --output \
docker-compose.yml && docker-compose up -d
2 ) Vérifiez que le worker Azkarra est bien démarré :
$ curl -sX GET http://localhost:8080 | jq
{
"azkarraVersion": "0.5.0",
"commitId": "d2bc2fdc24e68eb143f4388960881974604093ca",
"branch": "master"
}
3 ) Enfin, vous pouvez accéder à l’interface utilisateur d’Azkarra qui est disponible sur http://localhost:8080/ui.
Comme nous pouvons peut le voir, pour l’instant notre worker ne fait absolument rien, puisque nous n’avons pas encore déployé de topologie.
Développons notre première topologie KafkaStreams
Afin de démontrer l’usage de l’API Azkarra, nous allons réécrire l’exemple classique : WordCountTopology.
Tout d’abord, créons un projet Java simple et ajoutons les AzkarraStreams aux dépendances de notre projet.
Exemple pour Maven (pom.xml
) :
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>azkarra-streams</artifactId>
<version>0.5.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
Notez que lorsque vous utilisez Azkarra Worker, votre projet ne doit jamais contenir les dépendances déjà fournies au runtime par Azkarra (c’est-à-dire azkarra-*, kafka-streams).
Ensuite, définissons notre topologie des flux Kafka en créant un nouveau fichier WordCountTopology.java
.
package azkarra;
import io.streamthoughts.azkarra.api.annotations.*;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
@Component
@TopologyInfo(description = "WordCount topology example")
public class WordCountTopology implements TopologyProvider {
@Override
public Topology get() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to(
"streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long())
);
return builder.build();
}
@Override
public String version() {
return "1.0";
}
}
Dans le code, ci-dessus, nous avons implémenté l’interface TopologyProvider
afin de retourner notre Topology
via la méthode get()
. Azkarra vous oblige à versionner chaque topologie. Ceci est utile, par exemple, pour exécuter plusieurs versions d’une même topologie ou pour générer automatiquement un application.id
.
L’annotation @Component
est ici nécessaire pour permettre à Azkarra de détecter automatiquement notre classe.
Et c’est tout ! Azkarra sera chargée de créer et de gérer l’instance KafkaStreams
nécessaire pour exécuter notre “Topologie”.
Déployer une topologie
Maintenant que nous avons défini notre topologie (WordCountTopology
) nous devons la rendre disponible au worker Azkarra.
Pour cela, nous devons packager et installer la topologie WordCountTopology
en tant que composant dans un des répertoires configurés via la propriété : azkarra.component.paths
.
Si vous jetez un coup d’oeil au fichier docker-compose.yml
, vous verrez que la valeur de cette propriété est /tmp/azkarra/components
. Cette propriété est configurée via une variable d’environnement.
La propriété azkarra.component.paths
doit définir la liste des répertoires (séparés par une virgule) à partir desquels les composants seront scannés.
Chaque répertoires peut contenir:
un uber JAR contenant toutes les classes ou autres dépendances nécessaires à notre composant.
un sous-répertoire contenant tous les JARs dont dépendent notre composant.
Généralement, avec Maven, vous utiliserez les plugins maven-assembly-plugin
ou maven-shade-plugin
pour packager votre application (topologie) en un Uber JAR.
Après avoir packagé notre application, nous pouvons copier le .jar
dans le répertoire /tmp/azkarra/components puis /tmp/azkarra/components
puis redémarrer le conteneur Docker de la façon suivante :
$ docker-compose restart
Vous pouvez maintenant lister les topologies disponibles via l’API REST suivante :
curl -sX GET http://localhost:8080/api/v1/topologies | jq
[
{
"name": "azkarra.WordCountTopology",
"version": "1.0",
"description": "WordCount topology example",
"aliases": [
"WordCount",
"WordCountTopology"
],
"config": {}
}
]
Pour finir, démarrons une nouvelle instance KafkaStreams
en publiant la configuration JSON suivante :
curl -H "Content-Type:application/json" \
-X POST http://localhost:8080/api/v1/streams \
--data '{"type": "azkarra.WordCountTopology", "version": "1.0", "env": "__default", "config": {} }'
Dans la commande ci-dessus, nous spécifions le type et la version de la topologie à déployer ainsi que l’environnement cible.
En effet, Azkarra a un concept de “StreamExecutionEnvironment
” qui agit comme un conteneur pour l’exécution des instances KafkaStreams
. Par défaut, un environnement nommé “_default” est créé.
Notez qu’Azkarra créera automatiquement tous les topics source et sink définis par la topologie (azkarra.context.auto.create.topics.enable=true
).
Explorer l’usage de Azkarra WebUI
Azkarra est livré avec une interface Web intégrée qui vous permet d’obtenir des informations sur les applications Kafka Streams en cours d’exécution.
Par exemple, vous pouvez :
Obtenir des informations sur les StreamThreads et les Tasks d’une instance KafkaStreams :
Visualiser le DAG d’une topologie :
Lister les métriques d’une instance KafkaStreams :
De plus, l’interface Web d’Azkarra vous permet d’arrêter, de redémarrer et de supprimer des instances KafkaStreams locales.
Interroger les State Stores
Enfin, AzkarraStreams dispose d’un excellent mécanisme pour interroger les états matérialisés par les applications de KafkaStreams via des appels REST.
Pour illustrer cette fonctionnalité, nous allons produire quelques messages :
$ docker exec -it broker /usr/bin/kafka-console-producer \
--topic streams-plaintext-input \
--broker-list broker:9092
Azkarra Streams
WordCount
I Heart Logs
Kafka Streams
Making Sense of Stream Processing
Vous pouvez maintenant, requêter le state WordCount
qui est géré par notre application.
curl -sX POST http://localhost:8080/api/v1/applications/word-count-topology-1-0/stores/WordCount \
--data '{"query":{"get":{"key": "streams"}},"type":"key_value", "set_options":{}}' | jq
{
"took": 1,
"timeout": false,
"server": "azkarra:8080",
"status": "SUCCESS",
"result": {
"success": [
{
"server": "azkarra:8080",
"remote": false,
"records": [
{
"key": "streams",
"value": 2
}
]
}
],
"total": 1
}
}
Vous pouvez également interroger un State Store directement via le WebUI d’Azkarra.
Pour aller plus loin
Si vous souhaitez en savoir plus sur l’utilisation de AzkarraStreams, la documentation se trouve sur la page GitHub. La documentation contient un guide étape par étape pour apprendre les concepts de bases d’Azkarra.
Le projet contient également quelques exemples.
Conclusion
Azkarra Streams est une initiative visant à enrichir l’écosystème d’Apache Kafka Streams et à faciliter son adoption par les développeurs.
Nous espérons que ce projet sera bien accueilli par la communauté open-source et Apache Kafka. AzkarraStreams est encore en cours d’évolution et certaines fonctionnalités doivent être améliorées.
Pour soutenir le projet Azkarra Streams, vous pouvez ⭐ le projet Github ou retweeter si ce projet vous aide !