Cet article est le troisième de la série “Streaming data into Kafka “. Dans les deux premiers, nous avons vu comment utiliser Kafka Connect pour charger des données depuis des fichiers CSV et XML dans Apache Kafka, sans avoir à écrire une seule ligne de code.
Pour ce faire, nous avons utilisé la solution Kafka Connect FilePulse, un connecteur Kafka Connect qui offre de nombreuses fonctionnalités intéressantes pour analyser et transformer les données.
- Streaming data into Kafka S01/E01 - Loading CSV file
- Streaming data into Kafka S01/E02 - Chargement d’un fichier XML
Voyons maintenant comment intégrer des données JSON, un autre format de fichier très largement utilisé sur la plupart des projets (et beaucoup plus apprécié que le XML pour les applications web).
Kafka Connect File Pulse connector
Si vous avez déjà lu les articles précédents, passez directement à la section suivante (Ingestion de données).
Le connecteur Kafka Connect FilePulse facilite l’analyse, la transformation et l’envoi de données dans Kafka depuis des fichiers. Celui-ci permet de traiter nativement divers formats de fichiers tels que CSV, XML, JSON, AVRO, LOG4J, etc.
Pour une introduction du projet Kafka Connect FilePulse, je vous suggère de lire cet article :
Pour plus d’informations, vous pouvez aussi consulter la documentation ici.
Comment utiliser le connecteur
La façon la plus simple et la plus rapide de commencer avec Kafka Connect FilePulse est d’utiliser l’image Docker disponible sur Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:latest
Vous pouvez télécharger le fichier docker-compose.yml
disponible sur le dépôt GitHub du projet pour démarrer rapidement une plate-forme Confluent avec Kafka Connect et le connecteur FilePulse préinstallé.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d
Une fois que tous les conteneurs Docker sont démarrés, vous pouvez vérifier que le connecteur est bien installé sur le worker Kafka Connect accessible sur http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Note : Vous pouvez également installer/télécharger le connecteur directement depuis le dépôt GitHub ou à partir de Confluent Hub.
Ingestion de données
Pour lire un fichier contenant un seul document JSON, nous utiliserons le BytesArrayInputReader
. Ce Reader
permet de créer un seul record par fichier source. Chaque record aura un unique champ de type byte[] nommé “message”. La valeur du champ correspond au contenu du fichier source (c’est-à-dire le document JSON).
Ensuite, pour parser le champ JSON, nous utiliserons le mécanisme Processing Filters fourni par le connecteur FilePulse et plus particulièrement le JSONFilter.
Nous pouvons créer le connecteur avec la configuration suivante :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"tracks-filepulse-json-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"tasks.max": 1
}'
Note: Kafka Connect FilePulse scanne périodiquement le répertoire d’entrée que nous définissons en utilisant la propriété fs.scan.directory.path
. Ensuite, il recherchera les fichiers correspondant au modèle .*\\.json$
. Chaque fichier est identifié et suivi de manière unique en fonction de propriété “offset.strategy”. Ici, la configuration précise qu’un fichier est identifié par son nom.
Créez un fichier JSON valide qui ressemble à ceci :
$ cat <<EOF > track.json
{
"track": {
"title":"Star Wars (Main Theme)",
"artist":"John Williams, London Symphony Orchestra",
"album":"Star Wars",
"duration":"10:52"
}
}
EOF
Copiez ensuite ce fichier dans le conteneur Docker qui exécute Kafka Connect avec les commandes suivantes :
// Create the target directory
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
// Copy host file to docker-container
$ docker cp track.json connect://tmp/kafka-connect/examples/track-00.json
Enfin, consommez le Topic tracks-filepulse-json-00
et vérifiez que le connecteur a détecté et traité le fichier JSON :
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"message": {
"bytes": "{ \n \"track\": {\n \"title\":\"Star Wars (Main Theme)\",\n \"artist\":\"John Williams, London Symphony Orchestra\",\n \"album\":\"Star Wars\",\n \"duration\":\"10:52\"\n }\n}\n"
},
"track": {
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "10:52"
}
}
}
}
Note : Dans l’exemple ci-dessus, nous utilisons kafkacat pour consommer les messages. L’option o-1
est utilisée pour ne consommer que le dernier message disponible sur le topic.
Exclure un champ
Le JSONFilter
ne supprime pas automatiquement le champ original contenant la chaîne JSON brute (c’est-à-dire le champ message
). Si vous ne souhaitez pas conserver ce champ, vous pouvez le supprimer en utilisant le ExcludeFilter
:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"tracks-filepulse-json-01",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON, ExcludeFieldMessage",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"filters.ExcludeFieldMessage.fields":"message",
"tasks.max": 1
}'
Copiez le fichier JSON dans le conteneur Docker comme précédemment :
$ docker cp track.json \
connect://tmp/kafka-connect/examples/track-01.json
Ensuite, consommez le Topic de sortie tracks-filepulse-json-01
en exécutant la commande suivante :
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-01 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"track": {
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "10:52"
}
}
}
}
Et c’est tout ! Nous avons réussi à produire un message structuré propre, similaire à celui contenu dans notre fichier d’entrée.
Maintenant, essayons d’aller un peu plus loin.
Gestion des valeurs null
Parfois, vous devrez traiter des documents JSON dont certaines valeurs sont nulles. Par défaut, si nous prenons la configuration utilisée jusqu’à présent, les valeurs nulles seront ignorées lors de la sérialisation.
La principale raison est que le connecteur ne peut pas déduire le type d’un champ contenant une valeur “nulle”.
Cependant, il est possible de combiner le AppendFilter et une Simple Connect Expression Language (SCEL) pour définir le type de valeur nulle et fixer une valeur par défaut.
Note: Simple Connect Expression Language (SCEL) est un langage d’expression de base fourni par le connecteur Connect FilePulse pour accéder et manipuler les champs des records.
Créons un nouveau connecteur avec la configuration suivante:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"tracks-filepulse-json-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"filters.ExcludeFieldMessage.fields":"message",
"filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetDefaultRank.field":"$value.track.rank",
"filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '\''INTEGER'\'') }}",
"filters.SetDefaultRank.overwrite": "true",
"tasks.max": 1
}'
Créons un second fichier JSON avec le contenu suivant:
$ cat <<EOF > track-with-null.json
{
"track": {
"title":"Duel of the Fates",
"artist":"John Williams, London Symphony Orchestra",
"album":"Star Wars",
"duration":"4:14",
"rank": null
}
}
EOF
Puis, copions-le dans le conteneur Docker comme précédemment.
$ docker cp track-with-null.json \
connect://tmp/kafka-connect/examples/track-02.json
Enuiste, consommons le topic de sortie tracks-filepulse-json-01
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-02 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"track": {
"Track": {
"title": {
"string": "Duel of the Fates"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "4:14"
},
"rank": {
"int": 0
}
}
}
}
Enfin, vous devriez obtenir un message de sortie contenant le champ rank
de type int
et initialisé avec la valeur par défaut 0
.
Créer un record par élément dans un JSON Array
Enfin, il est également courant de devoir traiter des fichiers JSON contenant un ensemble d’objets JSON sous la forme d’un array
.
Pour produire un record par élément dans un tableau, il est possible de configurer la propriété explode.array
du JSONFilter
à true
.
Publions donc une nouvelle configuration en exécutant la commande suivante:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"tracks-filepulse-json-03",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON, ExcludeFieldMessage",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ParseJSON.explode.array":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"filters.ExcludeFieldMessage.fields":"message",
"tasks.max": 1
}'
Créons un fichier contenant deux objets JSON :
$ cat <<EOF > tracks.json
[
{
"track": {
"title": "Star Wars (Main Theme)",
"artist": "John Williams, London Symphony Orchestra",
"album": "Star Wars",
"duration": "10:52"
}
},
{
"track": {
"title": "Duel of the Fates",
"artist": "John Williams, London Symphony Orchestra",
"album": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)",
"duration": "4:14"
}
}
]
EOF
Puis, copions-le dans le conteneur Docker comme précédemment.
$ docker cp tracks.json \
connect://tmp/kafka-connect/examples/tracks-00.json
Ensuite, il ne nous reste plus qu’à consommer le topic de sortie tracks-filepulse-json-02
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-03 \
-C -J -q -o0 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"track": {
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "10:52"
}
}
}
}
{
"track": {
"Track": {
"title": {
"string": "Duel of the Fates"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
},
"duration": {
"string": "4:14"
}
}
}
}
Et voilà! Vous savez maintenant comment traiter les fichiers JSON avec Kafka Connect.
Conclusion
Nous avons vu dans cet article qu’il est relativement facile de charger un fichier JSON dans Kafka sans écrire une seule ligne de code en utilisant le framework Kafka Connect. De plus, le connecteur Connect File Pulse est une solution complète qui permet de manipuler facilement vos données avant de les envoyer dans Apache Kafka.
N’hésitez pas à partager cet article. Si ce projet vous plaît, vous pouvez également nous soutenir en ajoutant une ⭐ au repositoy GitHub pour nous soutenir.