Présentation de Azkarra Streams: Le micro-framework dédié à Kafka Streams

Présentation de Azkarra Streams: Le micro-framework dédié à Kafka Streams
The Author's Avatar

Cet article est disponible en anglais sur : Medium

Kafka Streams est une librairie très efficace pour développer des applications complexes de stream-processing basées sur Apache Kafka. Cependant, avec le temps et après plusieurs projets, nous avons eu le sentiment de nous répéter et de réécrire encore et encore le même code pour exécuter et interagir avec nos applications déployées en production.

Nous étions alors convaincus que le fait de développer un micro-service simple avec KafkaStreams et avec le minimum des fonctionnalités nécessaires pour aller en production sereinement devait-être une question de jours et non de semaines.

C’est pourquoi, nous avons décidé d’implémenter notre propre framework avec pour objectif de faciliter et d’accélérer le développement d’applications KafkaStreams.

Aujourd’hui, nous sommes heureux de pouvoir vous annoncer la première version d’Azkarra Streams, un nouveau micro-framework open-source, écrit en java, qui vous permet de vous concentrer sur l’écriture du code de vos topologies Kafka Streams et non sur le code passe-partout nécessaire à leur exécution.

AzkarraStreams Logo

Les fonctionnalités clés

Azkarra Streams fournit un ensemble de fonctionnalités permettant de débuguer et de construire rapidement des applications Kafka Streams prêtes à la production. Cela comprend, entre autres, fonctionnalités suivantes :

  • Gestion du cycle de vie des instances KafkaStreams (plus de KafkaStreams#start()).
  • Externalisation facile de la configuration (en utilisant Typesafe Config).
  • Serveur HTTP embarqué pour l’interrogation des StateStores (Undertow).
  • APIs REST pour exposer les métriques des instances KafkaStreams (JSON, Prometheus).
  • WebUI pour la visualisation des topologies.
  • Authentification via SSL/TLS ou BasicAuth.
  • Etc.

Pourquoi Azkarra ?

Avant de développer notre première application avec Azkarra, prenons le temps de décrire les différentes parties d’une application standard Kafka Streams afin de mieux comprendre les avantages de ce nouveau framework.

Pour ce faire, nous allons utiliser le fameux exemple de word-count qui est disponible sur la documentation officielle de Kafka Streams.

Tout d’abord, nous devons déclarer et construire une Topology.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder
 .stream("streams-plaintext-input");

 textLines.flatMapValues(value ->
  Arrays.asList(value.toLowerCase().split("\\W+"))
 )
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to(
  "streams-wordcount-output",       
  Produced.with(Serdes.String(), Serdes.Long())
);
Topology topology = builder.build();

Ensuite, nous devons définir la configuration de notre application.

Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, "streams-word-count");
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

puis créer notre instance KafkaStreams

final KafkaStreams streams = new KafkaStreams(topology, props);

Et pour finir, nous devons gérer la partie runtime de l’application Kafka Streams. Cela signifie que nous devons de démarrer l’instance KafkaStreams et gérer proprement l’arrêt de l’application à l’aide d’un shutdown-hook Java.

final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(
new Thread("streams-shutdown-hook") {
  @Override
  public void run() {
    streams.close();
    latch.countDown();
  }
});

  try {
     streams.start();
     latch.await();
  } catch (Throwable e) {
     System.exit(1);
  }
  System.exit(0);

N.B : Voici le code complet : GitHub Gist

Cependant, une application Kafka est rarement aussi simple ! Vous devrez, par exemple, ajouter la gestion des erreurs et surveiller l’état et les métriques de l’instance KafkaStreams. Vous pourriez également vouloir interroger les états internes (StateStores) à l’aide de requêtes interactives, etc.

Enfin, vous devrez, à de rares occasions, faire face à certains problèmes de l’API (par exemple : https://issues.apache.org/jira/browse/KAFKA-7380).

Bref, au lieu de concentrer vos efforts de développement sur la définition et l’optimisation de votre topologie. C’est à dire, la partie qui apporte le plus de valeur à votre entreprise, vous passer une grande partie de votre temps à gérer des aspects purement techniques.

Premiers pas avec Azkarra

Dans un premier temps, nous avons cherché à résoudre le problème lié à la séparation de responsabilité entre la construction de la topologie et son exécution. En effet, pour nous, la création et le démarrage d’une nouvelle instance KafkaStreams ne devraient pas être gérés directement par les développeurs.

Tout d’abord, réécrivons donc l’exemple de word-count en utilisant l’API AzkarraStreams.

Pour cela, nous allons utiliser un archetype Maven de AzkarraStreams pour créer une structure de projet simple.

  • Vous pouvez exécuter la commande suivante :
$ mvn archetype:generate -DarchetypeGroupId=io.streamthoughts \
-DarchetypeArtifactId=azkarra-quickstart-java \
-DarchetypeVersion=0.3 \
-DgroupId=azkarra.streams \
-DartifactId=azkarra-getting-started \
-Dversion=1.0-SNAPSHOT \
-Dpackage=azkarra \
-DinteractiveMode=false

Le fichier pom.xml, du projet généré, contient déjà les dépendances pour Azkarra Streams et Kafka Streams :

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>io.streamthoughts</groupId>
        <artifactId>azkarra-streams</artifactId>
        <version>0.3</version>
    </dependency>
</dependencies>

En utilisant votre IDE ou éditeur préféré, ouvrez le projet Maven. Pour cette introduction, nous allons retirer l’exemple déjà présent et démarrer notre première application à partir de zéro.

$ cd azkarra-getting-started
$ rm -rf src/main/java/azkarra/*.java

Après cela, créons un nouveau fichier SimpleStreamsApp.java dans le répertoire src/main/java/azkarra, avec une méthode de base java main(String[] args) et notre définition de topologie comme suit :

package azkarra;
@AzkarraStreamsApplication
public class SimpleStreamsApp {

    public static void main(String[] args) {
        AzkarraApplication.run(SimpleStreamsApp.class, args);
    }

    @Component
    public static class WordCountTopology implements TopologyProvider {

        @Override
        public Topology get() {
            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> textLines = 
                          builder.stream("streams-plaintext-input");

            textLines
                .flatMapValues(value ->
                    Arrays.asList(value.toLowerCase().split("\\W+"))
                )
                .groupBy((key, value) -> value)
                .count(Materialized.as("WordCount"))
                .toStream()
                .to(
                   "streams-wordcount-output",
                   Produced.with(Serdes.String(), Serdes.Long())
                );
            return builder.build();
        }

        @Override
        public String version() {
            return "1.0";
        }
    }
}

Comme vous pouvez le voir, nous avons uniquement implémenté une interface appelée TopologyProvider afin de construire notre Topology. Azkarra Streams est ensuite responsable de configurer et de démarrer automatiquement l’instance KafkaStreams.

Ensuite, nous devons configurer notre application. Pour cela, nous allons créer un fichier application.conf dans le répertoire src/main/resources avec le contenu ci-dessous :

N.B: Vous pouvez également laisser le fichier déjà présent dans le projet tel quel.

azkarra {
  context {
    streams {
      bootstrap.servers = "localhost:9092"
      default.key.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
      default.value.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
    }
  }
}

Félicitations ! Vous venez d’écrire votre première application de streaming en utilisant Azkarra.

Déployer votre application avec Docker

Pour exécuter notre application, nous allons, tout d’abord, démarrer un cluster Kafka. Pour ce faire, nous utiliserons les images Docker officielles mises à disposition par Confluent.Inc. Pour plus de facilité, vous pouvez utiliser le fichier docker-compose.yml contenu dans le projet.

$ cd azkarra-getting-started
$ docker-compose up -d

Créons ensuite les deux topics (source et sink) utilisés par la topologie.

$ chmod u+x quickstart-create-wordcount-topics.sh
$ ./quickstart-create-wordcount-topics.sh

Enfin, il ne nous reste plus qu’à packager et à exécuter notre application.

$ mvn clean package && java -jar target/azkarra-quickstart-java-0.3.jar

Pour vérifier que l’application KafkaStreams est en cours d’exécution, vous pouvez vérifier le endpoint HTTP health :

$ curl -sX GET ‘http://localhost:8080/health' | grep 'UP'

Enfin, nous allons produire quelques messages dans le topic plaintext-input:

$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-producer --topic streams-plaintext-input --broker-list kafka:9092
Azkarra Streams
WordCount
I Heart Logs   
Kafka Streams
Making Sense of Stream Processing

Vous pouvez également consommer le topic de sortie streams-wordcount-output :

$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-consumer --from-beginning --property print.key=true --property key.separator="-" --topic streams-wordcount-output --bootstrap-server kafka:9092

Un serveur HTTP embarqué

L’une des principales caractéristiques d’Azkarra est le serveur web intégré qui expose des services RESTs pour gérer et monitorer vos applications Kafka Streams locales.

Par exemple, vous pouvez :

  • Lister les instances KafkaStreams qui s’exécutent localement (c’est-à-dire les instances exécutées au sein de la JVM).
$ curl -sX GET http://localhost:8080/api/v1/streams | jq .
[
 "word-count-topology-1–0"
]
  • Récupérer la description d’une topologie spécifique:
$ curl -sX GET http://localhost:8080/api/v1/streams/word-count-topology-1-0/ | jq . 
{
 "id": "word-count-topology-1–0",
 "since": "2019–11–26T13:48:17.35+01:00[Europe/Paris]",
 "name": "WordCountTopology",
 "version": "1.0",
 "state": {
 "state": "RUNNING",
 "since": "2019–11–26T13:48:18.772+01:00[Europe/Paris]"
 }
}
  • Exporter les métriques d’une instance au format Prometheus:
$ curl -sX GET ‘http://localhost:8080/api/v1/streams/word-count-topology-1-0/metrics?format=prometheus'
# HELP streams_incoming_byte_rate The number of incoming bytes per second
# TYPE streams_incoming_byte_rate counter
streams_incoming_byte_rate{group=”admin-client-node-metrics”,id=”word-count-topology-1–0",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node — 1",} 0.0
# HELP streams_incoming_byte_total The total number of incoming bytes
# TYPE streams_incoming_byte_total counter
streams_incoming_byte_total{group=”admin-client-node-metrics”,id=”word-count-topology-1–0",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node-1",} 1066.0
....

Azkarra Streams expose d’autres API RESTs. Pour plus d’informations, consultez la documentation de référence.

Azkarra WebUI

Une autre fonctionnalité intéressante d’Azkarra Streams est l’interface web intégrée qui, par défaut, est disponible sur : http://localhost:8080/ui. Cette WebUI vous permet, entre autre, de gérer votre application de streaming.

Azkarra WebUI a d’abord été conçu pour faciliter le développement mais a rapidement évolué vers une mini-interface d’administration.

AzkarraStreams WebUI Overview

Par exemple, vous pouvez arrêter, redémarrer une instance KafkaStreams en utilisant le bouton “Available Actions”, mais aussi explorer les métriques disponibles, la configuration, etc.

Azkarra WebUI permet aussi de visualiser le DAG d’une topologie.

AzkarraStreams Topology DAG

Requêtes Interactives (Interactive Queries, a.k.a IQ)

Enfin, Kafka Streams dispose d’un excellent mécanisme pour requêter les états matérialisés par votre application.

Habituellement, en tant que développeurs, nous construisons des terminaux HTTP pour exposer ces états en utilisant l’API publique de Kafka Streams.

Azkarra Streams fournit un point d’extrémité par défaut à cette fin, qui est accessible directement via l’interface utilisateur Web d’Azkarra.

Le digramme ci-dessous montre une requête “all” sur le store “WordCount”.

AzkarraStreams Web UI Query All

Sous le capot, l’interface web exécute la requête HTTP suivante :

curl -sX POST 'http://localhost:8080/api/v1/applications/word-count-demo-1-0/stores/count' — data '{ "type":"key_value”, "query" : {"all":{} } }' | jq

Voici un dernier exemple pour requêter une clé spécifique :

AzkarraStreams WebUI Query GetKey

Pour aller plus loin

Si vous souhaitez en savoir plus sur l’utilisation de AzkarraStreams, la documentation se trouve sur la page GitHub. La documentation contient un guide étape par étape pour apprendre les concepts de base d’Azkarra.

Le projet contient également quelques exemples.

Conclusion

Azkarra Streams est une initiative visant à enrichir l’écosystème d’Apache Kafka Streams et à faciliter son adoption par les développeurs.

Nous espérons que ce projet sera bien accueilli par la communauté open-source et Apache Kafka. AzkarraStreams est encore en cours d’évolution et certaines fonctionnalités doivent être améliorées.

Pour soutenir le projet Azkarra Streams, vous pouvez ⭐ le projet Github ou retweeter si ce projet vous aide !

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.