Créer des applications Kafka Streams plus rapidement que jamais avec Azkarra Streams

Créer des applications Kafka Streams plus rapidement que jamais avec Azkarra Streams
The Author's Avatar

Kafka Streams est une librairie Java/Scala permettant de développer des applications, orientées microservices, de type stream-processing basées sur Apache Kafka.

Lors du développement d’une application KafkaStreams, les développeurs doivent non seulement définir leur topologie, c’est-à-dire le graphe d’opérations à appliquer aux messages consommés, mais aussi l’ensemble du code nécessaire à son exécution.

De plus, avant de pouvoir déployer une application KafkaStreams en production, il faudra savoir comment gérer les erreurs de traitement et les messages indésirables/corrompus puis comment surveiller et exploiter les instances. Enfin, si vous prévoyez d’exposer certains états internes, en utilisant la fonction intégrée de Kafka Streams appelée “Interactive Queries “, vous devrez également écrire le code nécessaire pour accéder à vos données (exemple : APIs REST).

Par conséquent, votre application peut rapidement devenir complexe avec du code passe-partout qui n’a pas (ou peu) de valeur métier directe mais que vous devrez maintenir et potentiellement dupliquer sur d’autres projets.

AzkarraStreams est un micro-framework open-source Java qui permet de développer et d’exploiter facilement les applications KafkaStreams (Azkarra est le mot basque pour “Rapide").

Principales caractéristiques

AzkarraStreams fournit un ensemble de fonctionnalités pour développer rapidement des applications KafkaStreams. Cela comprend, entre autres, les éléments suivants :

  • Gestion du cycle de vie des instances KafkaStreams (plus besoin de KafkaStreams#start()).
  • Externalisation facile de la configuration (en utilisant Typesafe Config).
  • Serveur HTTP embarqué pour l’interrogation des StateStores (Undertow).
  • APIs REST pour exposer les métriques des instances KafkaStreams (JSON, Prometheus).
  • WebUI pour la visualisation des topologies.
  • **Authentification via SSL/TLS ou BasicAuth.
  • Etc.

Pour commencer

Depuis la v0.5.0, il est possible d’utiliser l’image Docker (streamthoughts/azkarra-streams-worker) qui permet de démarrer un worker Azkarra standalone pour exécuter une ou plusieurs applications KafkaStreams.

Azkarra Worker suit le même mécanisme que celui utilisé par le projet Kafka Connect, c’est-à-dire que les topologies Kafka Streams sont fournies sous-forme de composants externes qui peuvent être démarrés et arrêtés soit par des appels REST, soit via la WebUI intégrée.

Pour commencer, démarrons une instance Azkarra Worker et un cluster Kafka mono-broker en utilisant le fichier docker-compose.yml disponible sur le dépôt GitHub.

1 ) Exécutez la commande suivante pour télécharger et exécuter les conteneurs :

$ curl -s https://raw.githubusercontent.com/streamthoughts/azkarra-streams/master/docker-compose.yml --output \
docker-compose.yml && docker-compose up -d

2 ) Vérifiez que le worker Azkarra est bien démarré :

$ curl -sX GET http://localhost:8080 | jq
{
  "azkarraVersion": "0.5.0",
  "commitId": "d2bc2fdc24e68eb143f4388960881974604093ca",
  "branch": "master"
}

3 ) Enfin, vous pouvez accéder à l’interface utilisateur d’Azkarra qui est disponible sur http://localhost:8080/ui.

Azkarra WebUI - Overview

Comme nous pouvons peut le voir, pour l’instant notre worker ne fait absolument rien, puisque nous n’avons pas encore déployé de topologie.

Développons notre première topologie KafkaStreams

Afin de démontrer l’usage de l’API Azkarra, nous allons réécrire l’exemple classique : WordCountTopology.

Tout d’abord, créons un projet Java simple et ajoutons les AzkarraStreams aux dépendances de notre projet.

Exemple pour Maven (pom.xml) :

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>io.streamthoughts</groupId>
        <artifactId>azkarra-streams</artifactId>
        <version>0.5.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

Notez que lorsque vous utilisez Azkarra Worker, votre projet ne doit jamais contenir les dépendances déjà fournies au runtime par Azkarra (c’est-à-dire azkarra-*, kafka-streams).

Ensuite, définissons notre topologie des flux Kafka en créant un nouveau fichier WordCountTopology.java.

package azkarra;

import io.streamthoughts.azkarra.api.annotations.*;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;

import java.util.Arrays;

@Component
@TopologyInfo(description = "WordCount topology example")
public class WordCountTopology implements TopologyProvider {

    @Override
    public Topology get() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("streams-plaintext-input");
        textLines
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, value) -> value)
        .count(Materialized.as("WordCount"))
        .toStream()
        .to(
            "streams-wordcount-output", 
             Produced.with(Serdes.String(), Serdes.Long())
        );
        return builder.build();
    }

    @Override
    public String version() {
        return "1.0";
    }
}

Dans le code, ci-dessus, nous avons implémenté l’interface TopologyProvider afin de retourner notre Topology via la méthode get(). Azkarra vous oblige à versionner chaque topologie. Ceci est utile, par exemple, pour exécuter plusieurs versions d’une même topologie ou pour générer automatiquement un application.id.

L’annotation @Component est ici nécessaire pour permettre à Azkarra de détecter automatiquement notre classe.

Et c’est tout ! Azkarra sera chargée de créer et de gérer l’instance KafkaStreams nécessaire pour exécuter notre “Topologie”.

Déployer une topologie

Maintenant que nous avons défini notre topologie (WordCountTopology) nous devons la rendre disponible au worker Azkarra.

Pour cela, nous devons packager et installer la topologie WordCountTopology en tant que composant dans un des répertoires configurés via la propriété : azkarra.component.paths.

Si vous jetez un coup d’oeil au fichier docker-compose.yml, vous verrez que la valeur de cette propriété est /tmp/azkarra/components. Cette propriété est configurée via une variable d’environnement.

La propriété azkarra.component.paths doit définir la liste des répertoires (séparés par une virgule) à partir desquels les composants seront scannés.

Chaque répertoires peut contenir:

  • un uber JAR contenant toutes les classes ou autres dépendances nécessaires à notre composant.

  • un sous-répertoire contenant tous les JARs dont dépendent notre composant.

Généralement, avec Maven, vous utiliserez les plugins maven-assembly-plugin ou maven-shade-plugin pour packager votre application (topologie) en un Uber JAR.

Après avoir packagé notre application, nous pouvons copier le .jar dans le répertoire /tmp/azkarra/components puis /tmp/azkarra/components puis redémarrer le conteneur Docker de la façon suivante :

$ docker-compose restart

Vous pouvez maintenant lister les topologies disponibles via l’API REST suivante :

curl -sX GET http://localhost:8080/api/v1/topologies | jq 
[
  {
    "name": "azkarra.WordCountTopology",
    "version": "1.0",
    "description": "WordCount topology example",
    "aliases": [
      "WordCount",
      "WordCountTopology"
    ],
    "config": {}
  }
]

Pour finir, démarrons une nouvelle instance KafkaStreams en publiant la configuration JSON suivante :

curl -H "Content-Type:application/json" \
-X POST http://localhost:8080/api/v1/streams \
--data '{"type": "azkarra.WordCountTopology", "version": "1.0",  "env": "__default", "config": {} }'

Dans la commande ci-dessus, nous spécifions le type et la version de la topologie à déployer ainsi que l’environnement cible.

En effet, Azkarra a un concept de “StreamExecutionEnvironment” qui agit comme un conteneur pour l’exécution des instances KafkaStreams. Par défaut, un environnement nommé “_default” est créé.

Notez qu’Azkarra créera automatiquement tous les topics source et sink définis par la topologie (azkarra.context.auto.create.topics.enable=true).

Explorer l’usage de Azkarra WebUI

Azkarra est livré avec une interface Web intégrée qui vous permet d’obtenir des informations sur les applications Kafka Streams en cours d’exécution.

Par exemple, vous pouvez :

  • Obtenir des informations sur les StreamThreads et les Tasks d’une instance KafkaStreams : Azkarra WebUI Streams Overview

  • Visualiser le DAG d’une topologie : Azkarra WebUI Streams DAG

  • Lister les métriques d’une instance KafkaStreams : Azkarra WebUI Streams Metrics

De plus, l’interface Web d’Azkarra vous permet d’arrêter, de redémarrer et de supprimer des instances KafkaStreams locales.

Interroger les State Stores

Enfin, AzkarraStreams dispose d’un excellent mécanisme pour interroger les états matérialisés par les applications de KafkaStreams via des appels REST.

Pour illustrer cette fonctionnalité, nous allons produire quelques messages :

$ docker exec -it broker /usr/bin/kafka-console-producer \
--topic streams-plaintext-input \
--broker-list broker:9092

Azkarra Streams
WordCount
I Heart Logs   
Kafka Streams
Making Sense of Stream Processing

Vous pouvez maintenant, requêter le state WordCount qui est géré par notre application.

curl -sX POST http://localhost:8080/api/v1/applications/word-count-topology-1-0/stores/WordCount \
--data '{"query":{"get":{"key": "streams"}},"type":"key_value", "set_options":{}}' | jq
{
  "took": 1,
  "timeout": false,
  "server": "azkarra:8080",
  "status": "SUCCESS",
  "result": {
    "success": [
      {
        "server": "azkarra:8080",
        "remote": false,
        "records": [
          {
            "key": "streams",
            "value": 2
          }
        ]
      }
    ],
    "total": 1
  }
}

Vous pouvez également interroger un State Store directement via le WebUI d’Azkarra.

Azkarra Web UI - Interactive Query

Pour aller plus loin

Si vous souhaitez en savoir plus sur l’utilisation de AzkarraStreams, la documentation se trouve sur la page GitHub. La documentation contient un guide étape par étape pour apprendre les concepts de bases d’Azkarra.

Le projet contient également quelques exemples.

Conclusion

Azkarra Streams est une initiative visant à enrichir l’écosystème d’Apache Kafka Streams et à faciliter son adoption par les développeurs.

Nous espérons que ce projet sera bien accueilli par la communauté open-source et Apache Kafka. AzkarraStreams est encore en cours d’évolution et certaines fonctionnalités doivent être améliorées.

Pour soutenir le projet Azkarra Streams, vous pouvez ⭐ le projet Github ou retweeter si ce projet vous aide !

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.