Créer une plateforme analytique temps-réel avec Kafka, ksqlDB et ClickHouse

Créer une plateforme analytique temps-réel avec Kafka, ksqlDB et ClickHouse
The Author's Avatar

Apache Kafka + ksqlDB + ClickHouse + Superset = Blazing Fast Analytic Platform

Récemment chez StreamThoughts, nous nous sommes intéressés à différentes solutions open-source de bases de données, de type OLAP, que nous pourrions expérimenter rapidement dans une architecture de streaming basée sur la plateforme Apache Kafka.

Notre objectif était de pouvoir répondre à des besoins analytiques, sur d’importants volumes de données ingérés en temps-réel. Pour cela, nous devions pouvoir exécuter des requêtes ad-hoc, de manière interactives, avec des latences “acceptables” (quelques secondes ou plus).

Dans le cadre de ce projet, la solution recherchée devait :

  • Être facilement utilisable out-of-the-box.

  • Proposer un langage de requête de type SQL (avec si possible un support JDBC).

  • Ne pas être couplée à l’écosystème Hadoop.

  • S’intégrer, éventuellement, à une solution de Data visualisation telle qu’Apache Superset.

Enfin, et de manière plus générale, nous souhaitions évaluer une solution qui soit, d’une part, élastique (pouvant passer facilement à l’échelle de plusieurs dizaines à plusieurs centaines de nœuds), et, d’autre part, qui dispose d’un mécanisme de réplication des données permettant de répondre à des besoins relativement classiques de haute-disponibilité et de tolérance à la panne.

Notre choix s’est finalement arrêté sur la solution ClickHouse.

Dans la suite de cet article, nous verrons comment intégrer, pas à pas, cette solution à l’écosystème Apache Kafka. Pour cela, nous prendrons pour exemple, une application simple de collecte et d’analyse, en quasi temps-réel, de tweets.

The schéma ci-dessous présente l’architecture globale de notre plateforme de streaming :

Global Architecture Overview

Étape 1 : Collecte et Ingestion des données

La première étape consiste à déployer notre plateforme d’ingestion de données ainsi que le service qui sera responsable de la collecte et de la publication des tweets dans un topic Kafka.

Pour cela, nous allons utiliser Docker pour rapidement démarrer les différents services qui composent notre plateforme de streaming : Zookeeper, Kafka, Schema-Registry et ksqlDB.

  • Cloner ou télécharger le projet de démo depuis GitHub:
$ git clone https://github.com/streamthoughts/demo-twitter-ksqldb-clickhouse.git
  • Compiler le module Maven qui contient les fonctions ksqlDB que nous utiliserons dans la suite de cet article.
$ cd demo-twitter-ksqldb-clickhouse
$ (cd ./ksql-custom-udfs; mvn clean package)
  • Démarrer ksqlDB et les services dépendants via docker-compose :
$ docker-compose up -d ksqldb-server
  • Vous pouvez facilement lister l’ensemble des services (containers) actuellement démarrés :
$ docker ps --format "{{.ID}}/{{.Names }} ({{.Status}}"

(output)

afd0d835c91d/ksqldb-server (Up 6 minutes)
2ae4b0560bc7/kafka-connect (Up 6 minutes (health: starting))
53dd5a1a8c0b/schema-registry (Up 6 minutes)
3c105ef1eb5d/kafka (Up 6 minutes)
bfecffd69ae4/zookeeper (Up 6 minutes)
  • Enfin, pour vérifier si ksqlDB fonctionne correctement, exécutez la commande suivante :
$ curl -sX GET http://localhost:8088/info | jq .
{
 "KsqlServerInfo": {
 "version": "0.8.1",
 "kafkaClusterId": "tlsv3OECQDucoA6-ZdpkxQ",
 "ksqlServiceId": "ksql-docker"
 }
}

ksqlDB & Kafka Connect

A présent, nous pouvons directement utiliser ksqlDB pour démarrer un connecteur Apache Kafka Connect et ainsi récupérer les Tweets qui nous intéressent. Pour cela, nous allons utiliser le connecteur Twitter source connector disponible en open-source depuis Confluent-Hub.

Si vous jetez un coup d’œil au fichier docker-compose.yml, vous remarquerez que nous avons configuré ksqlDB avec l’adresse de notre worker Kafka Connect. Celui-ci est déployé à partir d’une image Docker “custom” qui contient l’ensemble des connecteurs nécessaires à notre démo.

Avant de pouvoir déployer une nouvelle instance du connecteur, assurez-vous de disposer d’un accès à l’API Twitter Developer . Pour cela, vous devez créer un token et une clé d’accès à partir de votre page d’applications twitter.

  • Créer le topic nommé tweets via la commande suivante :
$ docker exec -it kafka kafka-topic \
--zookeeper zookeeper:2181 \
--create — topic tweets \
-- partitions 4 \
-- replication-factor 1
  • Démarrer un client ksql.
$ docker exec -it ksqldb-server ksql
KSQL, Copyright 2017–2019 Confluent Inc.

CLI v5.4.1, Server v5.4.1 located at http://localhost:8088

Having trouble? Type ‘help’ (case-insensitive) for a rundown of how things work!

ksql>
  • Créer un connecteur Twitter
ksql> CREATE SOURCE CONNECTOR `tweeter-connector` WITH ( 'connector.class'='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
 'twitter.oauthw.accessTokenSecret'='********',
 'twitter.oauth.consumerSecret'='********',
 'twitter.oauth.accessToken'='********',
 'twitter.oauth.consumerKey'='********',
 'kafka.status.topic'='tweets',
 'process.deletes'=false,
 'filter.keywords'='coronavirus,2019nCoV,SARSCoV2,covid19,cov19'
);

Message 
 — — — — — — — — — — — — — — — — — — -
 Created connector tweeter-connector 
 — — — — — — — — — — — — — — — — — — -
ksql>

Note: Dans la configuration ci-dessus, vous devez modifier les quatres propriétés préfixées twitter.oauth.* avec vos identifiants Twitter.

  • Vérifions que le connecteur fonctionne correctement. Pour cela, nous pouvons utiliser l’API REST de Kafka Connect.
$ curl \
-H "Content-Type:application/json" \
-X GET \
 [http://localhost:8083/connectors/tweeter-connector/status](http://localhost:8083/connectors/tweeter-connector/status) | jq

(output)

{
  "name": "tweeter-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "source"
}

Une fois déployé, le connecteur Kafka doit commencer à produire des records Avro dans le topic tweets.

  • Pour afficher les tweets ingérés, définissons un nouveau STREAM KSQL à partir du topic tweets.
ksql> CREATE STREAM tweets WITH (KAFKA_TOPIC = 'tweets', VALUE_FORMAT='AVRO');
  • Enfin, exécutons la requête KSQL suivante pour afficher les 5 premiers tweets disponibles:
ksql> SELECT Text FROM tweets EMIT CHANGES LIMIT 5;

Étape 2 : Transformation des données

Le connecteur est à présent déployé et nous commençons à ingérer des Tweets, en temps-réel. Cependant, le schéma associé aux données publiées par le TwitterSourceConnector contient un certain nombre de structures de données complexes qu’il sera, par la suite, difficile de requêter.

  • Pour inspecter le schéma des records Avro, vous pouvez exécuter la déclaration KSQL suivante :
ksql> DESCRIBE EXTENDED TWEETS;

En règle générale, il est plus facile de travailler sur une structure de données à plat contenant uniquement des types primitifs. Puisque seul un sous-ensemble des champs est pertinent pour ce projet, il est plus simple de filtrer les records avant de les insérer dans la base de données cible. Pour cela, nous allons utiliser ksqlDB pour transformer facilement les tweets au fur et à mesure qu’ils sont ingérés.

ksqlDB & Push Queries

ksqlDB définit un concept de push query qui va nous permettre de consommer le STREAM TWEETS, précédemment créé, afin d’appliquer une transformation sur chaque message reçu, puis de pousser le résultat dans un nouveau STREAM matérialisé sous la forme d’un topic Kafka tweets-normalized. Il est important de noter qu’une push query s’exécute indéfiniment.

  • Créer le topic tweets-normalized via la commande suivante :
$ docker exec -it kafka kafka-topic \
--zookeeper zookeeper:2181 \
--create — topic tweets-normalized \
-- partitions 4 \
-- replication-factor 1
  • Exécuter la requête KSQL suivante afin de créer un STREAM dérivé nommé TWEETS_NORMALIZED :
ksql>
CREATE STREAM TWEETS_NORMALIZED
  WITH (kafka_topic = 'tweets-normalized') AS
  SELECT
    Id,
    CreatedAt / 1000 as CreatedAt,
    Text,
    Lang,
    Retweeted,
    User->Id as UserId,
    User->Name as UserName,
    IFNULL(User->Description, '') as UserDescription,
    IFNULL(User->Location, '') as UserLocation,
    ARRAY_TO_STRING( EXTRACT_ARRAY_FIELD(UserMentionEntities, 'Name'), ',', '') as Mentions,
    ARRAY_TO_STRING( EXTRACT_ARRAY_FIELD(HashtagEntities, 'Text'), ',', '') as Hashtags
FROM tweets EMIT CHANGES;
  • Affichons le résultat de la transformation en souscrivant au STREAM TWEETS_NORMALIZED:
ksql> SELECT * FROM TWEETS_NORMALIZED EMIT CHANGES;

User-Defined Function (UDF)

Il est relativement facile d’étendre ksqlDB via l’implémentation de User-Defined Function (UDF). Ainsi, pour pouvoir faciliter l’extraction des hashtags et des mentions, présents dans chaque tweet, nous avons créé les deux UDFs suivantes :

  • EXTRACT_ARRAY_FIELD : Extrait un champ spécifique depuis un champs de type Array.

  • ARRAY_TO_STRING : Joint l’ensemble des éléments d’un tableau en utilisant un séparateur donné.

Vous trouverez le code-source associé aux deux UDFs sur le repository GitHub.

Étape 3 : Stockage et requêtage des données

Clickhouse Logo

Avant de pouvoir analyser les tweets ingérés, nous allons au préalable les stocker dans ClickHouse.

ClickHouse

ClickHouse est une base de données, open-source (Apache License 2.0), de type OLAP (Online Analytical Processing) développée à l’origine par la société Yandex, pour les besoins de sa solution Metrica (similaire à Google Analytics). Yandex n’est autre que le premier moteur de recherche utilisé en Russie.

ClickHouse a été développée avec un objectif simple : filtrer et agréger le plus rapidement possible le plus grande nombre de données. Similaire à d’autres solutions du même type (exemples: Druid, Pinot), ClickHouse utilise un modèle orienté colonnes pour le stockage des données. Il s’appuie également sur divers mécanismes de parallélisation et de vectorisation qui permettent de tirer pleinement partie des architectures multi-cœurs. Clickhouse supporte ainsi des volumes de données de plusieurs petabytes.

Déploiement d’une instance

Une base de données ClickHouse peut-être déployée aussi bien sous la forme d’un unique nœud que d’un cluster de plusieurs nœuds permettant l’implémentation de différentes stratégies de sharding et de réplication. Pour le stockage des méta-données liées à la réplication, ClickHouse s’appuie sur Zookeeper.

Remarque : Dans ClickHouse, chaque insertion de données dans une table répliquée entraînera l’exécution de plusieurs transactions dans Zookeeper. Zookeeper peut donc rapidement devenir un bottleneck. Pour un environnement de production, il sera recommandé de ne pas mutualiser le cluster Zookeeper utilisé pour Apache Kafka pour les besoins de ClickHouse.

Insertion des data dans ClickHouse

Dans le cadre de notre PoC, nous utiliserons une unique instance ClickHouse déployée via Docker.

  • Démarrer une instance ClickHouse
$> docker-compose up -d clickhouse
  • Ensuite, créer la table tweets après voir démarré un nouveau clickhouse-client de la manière suivante :
$ docker exec -it clickhouse bin/bash -c "clickhouse-client --multiline"

(ClickHouse CLI)

clickhouse :) CREATE TABLE IF NOT EXISTS default.tweets
(
    ID UInt64,
    CREATEDAT DateTime,
    TEXT String,
    LANG String,
    RETWEETED UInt8,
    USERID UInt64,
    USERNAME String,
    USERDESCRIPTION String,
    USERLOCATION String,
    HASHTAGS String,
    MENTIONS String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(CREATEDAT)
ORDER BY (CREATEDAT, LANG);

Pour consommer les données de Kafka et les intégrer directement dans ClickHouse nous allons, de nouveau, utiliser Kafka Connect et déployer, cette fois-ci, une instance du connecteur Kafka Connect JDBC (Sink) via ksqlDB. Au préalable, nous avons pris soin de télécharger et d’installer le driver JDBC de ClickHouse dans le répertoire classpath du Worker Kafka Connect.

  • Créer une instance Connect JDBC via ksql.
ksql> CREATE SOURCE CONNECTOR `clickhouse-jdbc-connector` WITH (
    'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector',  
    'topics'='tweets-normalized',
    'tasks.max'='1',
    'connection.url'='jdbc:clickhouse://clickhouse:8123/default',
    'table.name.format'='tweets'
);
  • Vous devriez normalement pouvoir requêter la table ClickHouse tweets :
$ docker exec -it clickhouse bin/bash -c "clickhouse-client -q 'SELECT COUNT(*) AS COUNT, LANG FROM tweets GROUP BY LANG ORDER BY (COUNT) DESC LIMIT 10;'"

Exemple de résultat :

┌─COUNT─┬─LANG─┐
│ 66896 │ en   │
│ 20577 │ es   │
│  4076 │ fr   │
│  3070 │ und  │
│  2555 │ pt   │
│  1443 │ it   │
│  1393 │ in   │
│  1284 │ ja   │
│   970 │ ca   │
│   867 │ hi   │
└───────┴──────┘

10 rows in set. Elapsed: 0.013 sec. Processed 107.24 thousand rows, 1.18 MB (8.50 million rows/s., 93.74 MB/s.)

Quelques limitations

Bien que cette solution fonctionne, elle est loin d’être suffisante, en l’état, pour un contexte de production. En effet, ClickHouse supporte, en réalité, relativement mal l’ingestion de données en temps-réel, c.à.d record par record. La documentation recommande de réaliser des insertions par lot de 1000 records, au minimum, ou alors de ne pas dépasser plus d’une insertion par seconde.

Il est donc primordial de bien configurer le connecteur de manière à maximiser le nombre de records par insertion, notamment via la propriété batch.size (défaut: 3000).

De plus, il peut-être nécessaire de modifier la configuration, par défaut, des consumers internes au connecteur afin de récupérer un maximum de records depuis les brokers en une seule requête (fetch.min.bytes, fetch.max.bytes, max.poll.records, max.partition.fetch.bytes).

Malheureusement, selon votre cas d’utilisation et le débit des données à ingérer, la modification seule de la configuration peut ne pas être suffisante pour optimiser les écritures dans ClikHouse.

Solutions alternatives

Plusieurs solutions alternatives, à celle décrite précédemment, peuvent-être envisagées pour réaliser l’insertion en temps-réel des données dans ClickHouse.

Alternative n°1 : Buffer Table Engine

ClickHouse offre un mécanisme dit de Table Engine qui permet de définir, où et comment sont stockées les données d’une table mais aussi de définir les mécanismes d’accès, d’indexation ou encore de réplication.

Parmi l’ensemble des familles des moteurs disponibles, ClickHouse dispose également de moteurs spéciaux dont fait partie le type “Buffer”.

De manière simple, une table de type Buffer permet, comme son nom le sous-entend, de bufferiser en RAM des raws avant de les flusher périodiquement dans une autre table.

Dans le cas présent, cette solution semble idéale puisqu’elle permettrait de garantir les performances de Clickhouse, dans le temps, quel que soit le nombre d’insertions en entrée.

Malheureusement, cette solution n’est actuellement pas compatible avec le connecteur JDBC de Confluent qui ne reconnait pas l’existence d’une table de type Buffer.

L’usage d’une telle table nécessiterait donc soit l’implémentation d’un nouveau connecteur, soit la modification du connecteur JDBC existant pour y inclure le type Buffer.

Alternative n°2 : Built-in Kafka Integration

ClickHouse intègre également nativement un Table Engine spécial pour encapsuler nativement un topic Kafka en tant que “Table SQL”.

La déclaration suivante illustre comment créer une table avec le moteur Kafka.

$ docker exec -it clickhouse bin/bash -c "clickhouse-client --multiline"
clickhouse :) CREATE TABLE kafka_tweets_stream (
    ID UInt64,
    CREATEDAT DateTime,
    TEXT String,
    LANG String,
    RETWEETED UInt8,
    USERID UInt64,
    USERNAME String,
    USERLOCATION String,
    HASHTAGS String,
    MENTIONS String
  ) ENGINE = Kafka SETTINGS 
    kafka_broker_list = 'kafka:29092',
    kafka_topic_list = 'tweets-normalized-json',
    kafka_group_name = 'ch-tweet-group',
    kafka_format = 'JSONEachRow',
    kafka_skip_broken_messages = 1,
    kafka_num_consumers = 1;

Vous remarquerez que nous avons créé, ci-dessus, une table à partir d’un topic contenant les tweets au format JSON (JSONEachRow). La raison à cela est que, bien que Clickhouse support le format Avro avec l’utilisation du SchemaRegistry Confluent, les types UNION d’Avro ne sont pour le moment pas supportés. Par simplification, nous allons donc au préalable convertir notre STREAM Avro en JSON via la query KSQL suivante :

ksql > CREATE STREAM TWEETS_NORMALIZED_JSON 
WITH (KAFKA_TOPIC='tweets-normalized-json',VALUE_FORMAT='JSON') 
AS SELECT * FROM TWEETS_NORMALIZED;

Il important de noter que la table créée ne stocke pas à proprement parler de données mais permet plutôt la création en background d’un ou de plusieurs consumers associés à un Consumer Group. De ce fait, la table kafka_tweets_stream représente davantage un data stream temps-réel qu’une table SQL.

Pour illustrer cela, vous pouvez exécuter plusieurs fois la requête SQL suivante :

$ docker exec -it clickhouse bin/bash -c "clickhouse-client --multiline"
clickhouse :) SELECT * FROM kafka_tweets_stream ;

Vous remarquerez alors que Clickhouse ne retourne que les derniers messages consommés depuis le topic. En effet, la table prend la forme d’un data stream temps-réel dans lequel les messages ne peuvent être consommés qu’une seule fois.

Ainsi, pour pouvoir tirer pleinement parti de cette table nous allons créer une seconde table qui sera alimentée par celle-ci au travers d’une Materialized View qui va jouer le rôle de fetcher de manière similaire à notre SELECT.

Le schéma ci-dessous illustre comment s’articule les différentes tables entres elles :

ClickHouse — Apache Kafka Integration — Consumer

  • Tout d’abord, créons une table nommée kafka_tweets utilisée pour stocker les records qui seront récupérés depuis la table kafka_tweets_stream. Notons que ces deux tables ont le même schéma.
clickhouse :) CREATE TABLE kafka_tweets AS kafka_tweets_stream
ENGINE = MergeTree() 
PARTITION BY toYYYYMM(CREATEDAT)
ORDER BY (CREATEDAT, LANG);
  • Ensuite, nous pouvons créer la Materialized View qui va se charger de consommer les messages Kafka pour alimenter notre table kafka_tweets .
clickhouse :) CREATE MATERIALIZED VIEW kafka_tweets_consumer
TO kafka_tweets
AS SELECT * FROM kafka_tweets_stream;

Note : En interne, ClickHouse repose sur librdkafka la librairie C++ pour Apache Kafka. Il est possible de configurer un certain nombre de paramètres afin d’optimiser les clients.

  • Pour finir, vous pouvez tester cette solution alternative en ré-exécutant la requête suivante :
clickhouse :) SELECT COUNT(*) AS COUNT, LANG FROM kafka_tweets GROUP BY LANG ORDER BY (COUNT) DESC LIMIT 10;

L’intégration Kafka offerte par Clickhouse ouvre des perspectives très intéressantes en terme de manipulation de données, notamment du fait qu’il est possible d’utiliser une table pour produire des données dans Kafka. Nous pourrions, par exemple, créer une Materialized View pour agréger en temps-réel les messages entrants, insérer les résultats dans une table qui se chargerait alors de publier les rows dans Kafka.

Le schéma ci-dessous illustre l’utilisation des Materialized View de ClickHouse pour transformer des données Kafka.

ClickHouse — Apache Kafka Integration — Producer

Alternative n°3 : ClickHouse Sinker

Une nouvelle et dernière solution possible serait d’utiliser ClickHouse Sinker, un outil développé en Go pour intégrer facilement des messages depuis des topics Kafka vers ClickHouse. Cependant, nous n’avons pas pris le temps de tester cette solution.

Étape 4 : Visualisation

Enfin, il ne nous reste plus qu’à visualiser nos données. Pour cela, nous pouvons utiliser Apache Superset pour explorer les données, pour identifier des requêtes pertinentes et pour construire un ou plusieurs dashboards.

  • Vous pouvez démarrer et initialiser une instance Superset via Docker :
$ docker-compose up -d superset
$ docker exec -it superset superset-init
  • Ensuite, vous pouvez accéder à la UI en utilisant les user/password renseignés lors de l’initialisation : http://localhost:8080.
$ docker exec -it superset superset-init
  • Enfin, il est possible de créer une nouvelle source de données “ClickHouse” en configurant l’url SQLAlchemy suivante : clickhouse://clickhouse:8123

Superset CLickhouse Datasource

Superset apporte une interface facile d’utilisation pour requêter notre base et créer des charts.

Voici quelques exemples :

  • SQL Labs

Superset — SQL Lab

  • Charts

Superset — Explore — Charts

Conclusion

Il existe de plus en plus de solutions possibles pour construire des plateformes analytiques temps-réel qui ne s’appuient pas sur Hadoop pour le stockage de données. ClickHouse est une solution OLAP intéressante qu’il est possible d’intégrer relativement facilement dans une plateforme de streaming telle que Apache Kafka.

Sources / Ressources

ksqlDB & Kafka Connect

Clickhouse & Kafka

  • Introduction to the-mysteries of clickhouse replication by Robert Hodges & Altinity Engineering Team (slides)

  • Fast insight from fast data integrating Clickhouse and Apache Kafka by Altinity (slides).

  • The Secrets of ClickHouse Performance Optimizations (video)

  • ClickHouse Primary Keys (blog)

  • Comparison of the Open Source OLAP Systems for Big Data: ClickHouse, Druid, and Pinot (blog)

  • ClickHouse Data Distribution (blog)

  • Circular Replication Cluster Topology in ClickHouse (blog)

Autres :

  • CMU Advanced Database Systems — 20 Vectorized Query Execution (Spring 2019) by Andy Pavlo (video)
The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.