Kafka Connect FilePulse - Un connecteur pour tous les ingérer!

Kafka Connect FilePulse - Un connecteur pour tous les ingérer!
The Author's Avatar

La plupart des projets sur lesquels j’ai eu l’occasion de travailler pendant ces dernières années, ont consisté à ingérer des données dans des systèmes tels que Apache Kafka® et Apache Hadoop® pour effectuer des traitements et des enrichissements de données en temps réel et en batch.

L’un des défis récurrents de chacun de ces projets a toujours été de gérer la complexité des systèmes existants dans lesquels les données étaient fréquemment exportées, partagées et intégrées via l’utilisation de fichiers.

Je suis convaincu que les organisations qui fonctionnent aujourd’hui avec des applications qui reposent sur l’usage de fichiers d’export continueront, probablement, à le faire dans le futur. La principale raison à cela est que ces systèmes fonctionnent parfaitement bien depuis des années et qu’il n’y a aucune raison valable (ou aucun budget) pour les faire évoluer.

Cependant, toutes ces organisations doivent construire de nouveaux systèmes plus agiles, plus évolutifs et plus réactifs pour fournir des informations en temps réel. Et, tôt ou tard, ces données (c’est-à-dire ces fichiers) devront être collectées.

Kafka Connect

Apache Kafka® est désormais une solution standard pour construire des plates-formes de streaming d’événements avec pour but de centraliser et de distribuer des données en temps réel dans toute votre organisation.

Faisant partie de la plate-forme Apache Kafka®, Kafka Connect fournit un framework distribué et résilient pour diffuser des données entre Apache Kafka et des systèmes externes (par exemples : Hadoop, Cassandra, MongoDB, AWS S3, etc.).

Apache Kafka Connect — Overview

Kafka Connect utilise un concept de connecteurs source et de connecteurs sink pour faire le travail d’intégration entre tous vos systèmes. L’un des principaux avantages de Kafka Connect est que des dizaines de connecteurs sont déjà disponibles librement (ou avec License) pour les systèmes les plus courants.

Vous pouvez trouver la plupart des connecteurs existants sur Confluent Hub.

Cependant, lorsqu’il s’agit d’ingérer et de transformer efficacement des données en provenance de fichiers, il est généralement difficile d’utiliser Kafka Connect comme alternative à d’autres solutions telles que Logstash ou Apache Nifi. En effet, la plupart des connecteurs existants ne supportent, le plus souvent, qu’un ensemble limité de fonctionnalités ou ne sont pas adaptés à certains cas d’utilisation courants (par exemple, la collecte de logs).

Voici une liste, non-exhaustive, de certaines limitations.

Les connecteurs :

  • ne sont pas conçus pour être exécutés avec plusieurs tâches en parallèles.

  • ne peuvent pas être exécutés en mode distribué.

  • ne supportent que les formats de fichiers de type CSV.

  • ne peuvent pas être utilisés pour regrouper plusieurs lignes de texte.

  • ne peuvent pas être étendus facilement.

Note : Si vous souhaitez en savoir plus sur Kafka Connect et ses utilisations, je vous recommande vivement de lire les articles du blog de Robin Moffat - https://rmoff.net/.

Introduction Kafka Connect File Pulse

Connect File Pulse est un plugin Kafka Connect polyvalent pour l’ingestion et la transformation de fichiers qui est disponible en open-source.

Les fonctionnalités clés.

Connect File Pulse offre la possibilité de scanner périodiquement un répertoire local (de manière récursive ou non) à la recherche de nouveaux fichiers, de les lire et de transformer les données en messages fortement typés. Les messages (ou records) sont envoyés dans Apache Kafka au fur et à mesure que de nouveaux fichiers sont ajoutés dans le répertoire d’entrée ou que des données sont ajoutées à un fichier existant.

Décrivons maintenant certains des concepts propres à Connect File Pulse.

Readers

Connect File Pulse prend en charge plusieurs formats d’entrée via la notion de FileInputReader.

Au moment d’écrire cet article, Connect File Pulse supporte les implémentations suivantes:

  • RowFileInputReader: permet de lire un fichier ligne par ligne pour créer un record par ligne. Il peut être utilisé pour traiter des fichiers de type CSV, TSV ou des fichiers de logs applicatifs (par exemple log4j).

  • BytesArrayInputReader: peut être utilisé pour créer un unique record de type byte-array à partir d’un fichier source.

  • AvroFileInputReader: permet de lire des fichiers Avro.

  • XMLFileInputReader: permet de lire des fichiers XML simples. Ce Reader infère le schéma des records depuis le DOM XML.

Filters

Connect File Pulse permet aussi de configurer des chaînes de filtrage complexes pour analyser, transformer et enrichir les données grâce à l’utilisation de Filtres.

Voici quelques exemples de filtres disponibles :

  • AppendFilter: Ajoute une ou plusieurs valeurs à un champ existant ou non existant.

  • DelimitedRowFilter: Extrait des données délimitées par un séparateur depuis un champ texte.

  • GrokFilter: Extrait des données depuis un champ texte non-structuré en combinant des patterns Grok (similaire à Logstash).

  • GroupRowFilter: Fusionne plusieurs messages consécutifs en utilisant un ou plusieurs champs comme clé de regroupement.

  • JSONFilter: Transforme un champs JSON en structure complexe.

Les filtres peuvent être comparés au mécanisme intégré de Kafka Connect appelé Single Message Transformation (SMTs). Cependant, ils permettent de construire des pipelines plus complexes pour structurer les données des fichiers avant l’envoi dans Kafka.

Par exemple, vous pouvez combiner certains filtres pour diviser un message d’entrée en plusieurs messages ou pour bufferiser temporairement des messages consécutifs afin de les regrouper par champs ou par patterns.

Un cas d’usage classique consiste à utiliser les filtres pour analyser des fichiers de logs applicatifs. Vous trouverez ci-dessous un exemple de configuration utilisant les deux filtres : MultiRowFilter et GrokFilter.

Comme vous pouvez le voir, dans l’exemple ci-dessus, Connect File Pulse suit le même style de configuration pour la définition des filtres que celui utilisé pour la configuration des Transformers :

  • filtres - Liste d’alias pour la chaîne de filtres, précisant l’ordre dans lequel les filtres seront appliqués.

  • filters.$alias.type - Nom de la classe Java du filtre.

  • filters.$alias.$filterSpecificConfig - Les propriétés pour configurer le filtre

Enfin, Connect File Pulse vous permet de configurer facilement un filtre de manière à ce qu’il ne soit appliqué qu’aux messages d’entrée correspondant à une condition spécifique.

Par exemple, si nous souhaitons ajouter une balise à tous les records contenant un message d’erreur pour une exception spécifique, alors nous pourrions configurer un filtre de la manière suivante:

Simple Connect Expression Language

Dans l’exemple ci-dessus, la propriété if accepte une Simple Expression.

En effet, Connect File Pulse définit un langage d’expression simple appelé Simple Connect Expression Language (ScEL en abrégé), basé sur des regex. Celui-ci permet d’accéder et de manipuler les champs et les métadonnées des records. Vous pouvez utiliser ScEL pour configurer la plupart des filtres intégrés.

Enfin, vous pouvez configurer chaque filtre soit pour ignorer les erreurs, soit pour basculer sur une sous-chaîne en cas d’erreur.

Les Stratégies de Cleanup

Par défaut, Connect File Pulse propose trois stratégies de cleanup qui peuvent être appliquées dès lors qu’un fichier a été traité avec succès ou a échoué :

  • DeleteCleanupPolicy: supprime tous les fichiers, quel que soit leur statut final (terminé ou échoué).

  • LogCleanPolicy: log les méta-données sur les fichiers complétés.

  • MoveCleanupPolicy: déplace des fichiers vers des répertoires cibles configurables.

Conçu pour être extensible

Dès le début, Connect File Pulse a été conçu pour être facilement extensible. Vous pouvez implémenter et configurer des classes personnalisées pour les interfaces: FileInputReader, Filter ou FileCleanupPolicy.

Vous pouvez même aller plus loin en implémentant votre propre classe pour scanner les répertoires d’entrée (par exemple FSDirectoryWalker).

Synchronisation entre le connecteur et les tâches

Kafka Connect ne fournit pas de mécanisme pour synchroniser les tâches et le connecteur déployés. Pour cette raison, Connect File Pulse utilise directement un topic Kafka interne pour suivre la progression du traitement des fichiers et pour garantir que chaque fichier est traité par une unique tâche (par défaut : connect-file-pulse-status).

Le connecteur est responsable de scanner périodiquement le répertoire d’entrée. Lorsque de nouveaux fichiers sont détectés, le connecteur déclenche une reconfiguration des tâches et chaque tâche est alors attribuée à un ensemble de fichiers. Lors du traitement des fichiers, les tâches envoient des informations sur le fichier en cours de traitement dans ce topic interne. Enfin, le connecteur consomme ce même topic et exécute la stratégie de cleanup pour les fichiers terminés.

Ce mécanisme permet de déployer en toute sécurité Connect FilePulse en mode distribué avec de multiples tâches.

De plus, il est possible d’utiliser le topic interne pour des besoins d’historisation des fichiers traités.

Example d’utilisation

Maintenant que vous avez une meilleure compréhension des utilisations et des caractéristiques de Kafka Connect FilePulse, nous allons voir comment déployer un connecteur pour parser des fichiers de logs applicatifs.

Pour cet exemple, nous allons simplement analyser et diffuser le fichier de logs généré par notre Connect Worker (c.à.d: kafka-connect.log) dans un topic Kafka.

Démarrer un cluster Kafka Cluster via Docker

Tout d’abord, nous devons déployer une plateforme Kafka. Pour ce faire, vous pouvez télécharger le fichier docker-compose.yml disponible sur le projet GitHub :

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml

Le fichier docker-compose.yml déploie un broker Kafka, un nœud Zookeeper et une instance de Schema Registry en utilisant les images Docker officielles de Confluent.Inc.

En plus, il démarre également un container exécutant un worker Kafka Connect avec le plugin Connect FilePulse préinstallé (https://hub.docker.com/r/streamthoughts/kafka-connect-file-pulse).

Notez que Connect FilePulse est également disponible sur confluent-Hub et peut donc être téléchargé et installé directement en utilisant la commande suivante :

$ confluent-hub install streamthoughts/kafka-connect-file-pulse:1.2.1

Pour démarrer les containers Docker, exécuter simplement la commande:

$ docker-compose up -d

Attendez quelques secondes que les conteneurs démarrent, puis pour vérifier que connect-file-pulse est bien disponible vous pouvez interroger l’API REST suivante:

$ curl -sX GET http://localhost:8083/connector-plugins | grep FilePulseSourceConnector

Configurer un nouveau connecteur

Créons un fichier de configuration JSON nommé connect-file-pulse-quickstart-log4j.json avec le contenu suivant:

Ensuite, vous pouvez démarrer une nouvelle instance du connecteur en exécutant la commande suivante:

$ curl -sX POST http://localhost:8083/connectors \
-d @connect-file-pulse-quickstart-log4j.json \
--header "Content-Type: application/json" | jq

Puis, vérifiez que le connecteur fonctionne correctement avec :

$ curl -sX GET http://localhost:8083/connectors/connect-file-pulse-quickstart-log4j | jq

Tracker la progression du traitement des fichiers

Pour suivre la progression du traitement des fichiers, vous pouvez consommer le topic interne utilisé pour synchroniser les tâches et le connecteur. Les messages sont sérialisés au format JSON.

$ docker exec -it -e KAFKA_OPTS="" connect kafka-console-consumer --topic connect-file-pulse-status --from-beginning --bootstrap-server broker:29092

Consommation des messages Kafka

Finalement, il ne nous reste plus qu’à consommer les messages écrits dans le topic logs-kafka-server :

$ docker exec -it -e KAFKA_OPTS="" connect kafka-avro-console-consumer --topic logs-kafka-connect \
--from-beginning --bootstrap-server broker:29092 \
--property schema.registry.url=http://schema-registry:8081

(output)

Vous devriez obtenir des messages de la forme suivante :

{“logdate”:{“string”:”2020–01–22 18:14:36,648"},”loglevel”:{“string”:”INFO”},”message”:{“string”:”[pool-15-thread-3] Finished initializing local filesystem scanner (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner)”}}
{“logdate”:{“string”:”2020–01–22 18:14:36,649"},”loglevel”:{“string”:”INFO”},”message”:{“string”:”[FileSystemMonitorThread] Starting thread monitoring filesystem. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread)”}}
{“logdate”:{“string”:”2020–01–22 18:14:36,649"},”loglevel”:{“string”:”INFO”},”message”:{“string”:”[FileSystemMonitorThread] Scanning local file system directory ‘/var/log/kafka/’ (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner)”}}
{“logdate”:{“string”:”2020–01–22 18:14:36,650"},”loglevel”:{“string”:”INFO”},”message”:{“string”:”[pool-15-thread-3] Finished creating connector connect-file-pulse-quickstart-log4j (org.apache.kafka.connect.runtime.Worker)”}}

Finalement, nous pouvons arrêter l’ensemble des containers :

$ docker-compose down

Comparaison entre Connect File Pulse et d’autres connecteurs

Voici une comparaison relativement simple entre Connect FilePulse et d’autres solutions de connecteur Kafka : Connect Spooldir and Connect FileStreams.

Conclusion

Kafka Connect File Pulse est un nouveau connecteur qui peut être utilisé pour ingérer facilement des données en provenance de fichiers dans Apache Kafka. Connect File Pulse offre la possibilité de définir des pipelines complexes pour transformer les données à l’aide d’une riche collection de filtres intégrés. Enfin et surtout, il définit un ensemble d’interfaces qui peuvent être implémentées pour s’adapter facilement à votre contexte projet.

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.