Dans le précédent article Streaming data into Kafka S01/E01- Loading CSV file, Nous avons vu comment il pouvait-être facile d’intégrer des données dans Apache Kafka en utilisant le framework Kafka Connect.
Plus particulièrement, nous avons vu comment analyser et transformer des fichiers CSV pour produire des enregistrements propres dans Kafka en utilisant le connecteur Kafka Connect FilePulse.
XML (Extensible Markup Language) est un autre format de données bien connu. Habituellement, le format XML n’est pas très apprécié par la plupart des développeurs en raison de sa lourdeur (ou de sa complexité). Cependant, il est encore utilisé par de nombreuses organisations pour faire interagir des systèmes entre eux.
Dans ce deuxième article, nous verrons comment lire des données à partir de fichiers XML pour les envoyer dans Kafka. Pour ce faire, nous utiliserons une fois de plus le connecteur Kafka Connect FilePulse, qui offre un support natif pour la lecture des fichiers XML.
Kafka Connect File Pulse connector
Si vous avez déjà lu les articles précédents, passez directement à la section suivante (Ingestion de données).
Le connecteur Kafka Connect FilePulse facilite l’analyse, la transformation et l’envoi de données dans Kafka depuis des fichiers. Celui-ci permet de traiter nativement divers formats de fichiers tels que CSV, XML, JSON, AVRO, LOG4J, etc.
Pour une introduction du projet Kafka Connect FilePulse, je vous suggère de lire cet article :
Pour plus d’informations, vous pouvez aussi consulter la documentation ici.
Comment utiliser le connecteur
La façon la plus simple et la plus rapide de commencer avec Kafka Connect FilePulse est d’utiliser l’image Docker disponible sur Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:latest
Vous pouvez télécharger le fichier docker-compose.yml
disponible sur le dépôt GitHub du projet pour démarrer rapidement une plate-forme Confluent avec Kafka Connect et le connecteur FilePulse préinstallé.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d
Une fois que tous les conteneurs Docker sont démarrés, vous pouvez vérifier que le connecteur est bien installé sur le worker Kafka Connect accessible sur http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Note : Vous pouvez également installer/télécharger le connecteur directement depuis le dépôt GitHub ou à partir de Confluent Hub.
Ingestion de données
Commençons par créer un premier connecteur avec la configuration suivante. Celle-ci précise que les Tasks
du connecteur doivent utiliser le XMLFileInputReader
pour lire les fichiers qui seront detectés par le connecteur.
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"offset.strategy":"name",
"topic":"playlists-filepulse-xml-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Note: Kafka Connect FilePulse scanne périodiquement le répertoire d’entrée que nous définissons en utilisant la propriété fs.scan.directory.path
. Ensuite, il recherchera les fichiers correspondant au modèle .*\\.xml$
. Chaque fichier est identifié et suivi de manière unique en fonction de propriété “offset.strategy”. Ici, la configuration précise qu’un fichier est identifié par son nom.
Une fois que le connecteur est créé, vous nous pourvons vérifier qu’il est correctement démarré via la commande suivante :
$ curl -s \
localhost:8083/connectors/source-xml-filepulse-00/status \
| jq '.connector.state'
"RUNNING"
Maintenant, créons un fichier XML appelé playlists.xml
avec le contenu suivant :
$ cat <<EOF > playlists.xml
<?xml version="1.0" encoding="UTF-8"?>
<playlists>
<playlist name="BestOfStarWars">
<track>
<title>Duel of the Fates</title>
<artist>John Williams, London Symphony Orchestra</artist>
<album>Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)</album>
<duration>4:14</duration>
</track>
<track>
<title>Star Wars (Main Theme)</title>
<artist>John Williams, London Symphony Orchestra</artist>
<album>Star Wars: The Empire Strikes Back (Original Motion Picture Soundtrack)</album>
<duration>10:52</duration>
</track>
</playlist>
</playlists>
EOF
Ensuite, copiez ce fichier de votre hôte dans le conteneur Docker qui fait fonctionner le connecteur. Vous pouvez exécuter les commandes suivantes :
// Copy XML file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
$ docker cp playlists.xml connect://tmp/kafka-connect/examples/playlists-00.xml
Enfin, vous pouvez consommer le topic playlists-filepulse-xml-00
pour vérifier que le connecteur a détecté et traité le fichier XML :
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t playlists-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"playlists": {
"Playlists": {
"playlist": {
"Playlist": {
"name": {
"string": "BestOfStarWars"
},
"track": {
"array": [
{
"Track": {
"title": {
"string": "Duel of the Fates"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
},
"duration": {
"string": "4:14"
}
}
},
{
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars: The Empire Strikes Back (Original Motion Picture Soundtrack)"
},
"duration": {
"string": "10:52"
}
}
}
]
}
}
}
}
}
}
Note : Dans l’exemple ci-dessus, nous utilisons kafkacat pour consommer les messages. L’option o-1
est utilisée pour ne consommer que le dernier message disponible sur le topic.
Comme vous pouvez remarquer, dans l’exemple ci-dessus, Kafka Connect FilePulse déduit automatiquement le schéma à partir du fichier XML d’entrée.
Vous pouvez vérifier le schéma Avro généré en utilisant le endpoint HTTP du Schema Registry:
$ curl -X GET \
http://localhost:8081/subjects/playlists-filepulse-xml-00-value/versions/latest/schema \
| jq
Forcer un type Array
Créons un deuxième fichier XML nommé single-track-playlist.xml
avec le contenu suivant :
$ cat <<EOF > single-track-playlists.xml
<?xml version="1.0" encoding="UTF-8"?>
<playlists>
<playlist name="BestOfJWilliams">
<track>
<title>Theme From Jurassic Park</title>
<artist>John Williams, London Symphony Orchestra</artist>
<album>Jurassic Park</album>
<duration>3:27</duration>
</track>
</playlist>
</playlists>
EOF
Puis, copions celui-ci dans le conteneur Docker comme précédemment.
$ docker cp single-track-playlists.xml \
connect://tmp/kafka-connect/examples/playlists-01.xml
Maintenant, consommons le topic playlists-filepulse-xml-00
. Vous verrez que le champ nommé track
est de type Track
, alors que dans le premier exemple il était de type array.
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t playlists-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"playlists": {
"Playlists": {
"playlist": {
"Playlist": {
"name": {
"string": "BestOfJWilliams"
},
"track": {
"Track": {
"title": {
"string": "Theme From Jurassic Park"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
}
}
}
}
}
}
}
}
Il s’agit d’un problème récurrent lorsqu’on essaie de déduire un schéma de données à partir d’un fichier XML. En effet, il peut-être difficile de déterminer qu’un champ doit être un tableau lorsqu’un seul élément est présent.
Pour résoudre ce problème, il est possible de configurer le XMLFileInputReader
pour forcer certains champs à être de type tableau.
Mettons à jour le connecteur avec la configuration donnée:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"force.array.on.fields": "track",
"offset.strategy":"name",
"topic":"playlists-filepulse-xml-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Recopions ensuite le fichier XML avec un nouveau nom, afin qu’il soit à nouveau scanné par le connecteur.
$ docker cp single-track-playlists.xml connect://tmp/kafka-connect/examples/playlist-03.xml
Enfin, nous pouvons consommer le topic de sortie pour observer l’effet de cette nouvelle configuration.
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t playlists-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"playlists": {
"Playlists": {
"playlist": {
"Playlist": {
"name": {
"string": "BestOfJWilliams"
},
"track": {
"array": [
{
"Track": {
"title": {
"string": "Theme From Jurassic Park"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
}
}
}
]
}
}
}
}
}
}
Le champ track
est maintenant de type array
.
Découper un fichier XML en plusieurs records
Jusqu’à présent, nous n’avons utilisé que le reader XMLFileInputReader
pour lire un seul record depuis chaque fichier XML.
Mais nous pourrions tout aussi bien produire un record pour chaque élément de track
présent dans les fichiers XML d’entrée. Pour ce faire, nous allons simplement remplacer la propriété xpath.expression
, dont la valeur par défaut est /
Pour commencer, créons un nouveau connecteur avec la configuration suivante:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-01/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "//playlist/track",
"offset.strategy":"name",
"topic":"tracks-filepulse-xml-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Ensuite, consommons les messages du topic tracks-filepulse-xml-00
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"title": {
"string": "Theme From Jurassic Park"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
}
}
Vous devriez ainsi obtenir un record par track
.
Extraction des valeurs type texte
Par défaut, le reader XMLFileInputReader
s’attend à ce que l’expression XPath renvoie un résultat de type NodeSet
.
Si votre objectif est d’extraire un seul élément de type texte depuis un fichier XML, vous devrez définir la propriété xpath.result.type=STRING
.
Par exemple, si nous configurons un connecteur avec l’expression XPath suivante //playlist/track/title/text()
alors le connecteur produira des messages sous la forme:
{
"message": {
"string": "Theme From Jurassic Park"
}
}
Ensuite, nous pourrions utiliser une Single Message Transformations (SMTs) comme ExtractField pour remplacer la valeur entière par le champ extrait.
Renommer des champs
Il est parfois utile de pouvoir changer le nom d’un champ. Cela peut être dû au fait que vous avez besoin de contextualiser davantage un champ ou que vous n’êtes pas satisfait des noms de champ présents dans le fichier d’entrée XML.
Par exemple, nous pouvons renommer le champ artist
parce qu’il contient une liste d'artists
séparés par des virgules.
Kafka Connect FilePulse nous permet de définir des pipelines complexes pour analyser, transformer et enrichir les données grâce à l’utilisation de filtres de traitement.
Ainsi, pour renommer le champ artist
en artists
, nous utiliserons le RenameFilter.
Créons un nouveau connecteur avec la configuration suivante:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"force.array.on.fields": "track",
"xpath.expression": "//playlist/track",
"filters": "RenameArtist",
"filters.RenameArtist.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter",
"filters.RenameArtist.field": "artist",
"filters.RenameArtist.target": "artists",
"offset.strategy":"name",
"topic":"tracks-filepulse-xml-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Ensuite, consommons les messages produits dans le sujet de sortie tracks-filepulse-xml-02
:
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-xml-02 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"title": {
"string": "Theme From Jurassic Park"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
},
"artists": {
"string": "John Williams, London Symphony Orchestra"
}
}
Diviser un champs en tableau
Enfin, je vais vous montrer comment vous pouvez combiner les filtres pour construire une chaîne de traitement complète.
Pour ce dernier exemple, nous allons diviser le champ artists
en tableau à l’aide du filtre SplitFilter
.
Créons un dernier connecteur avec la configuration suivante:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-03/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"force.array.on.fields": "track",
"xpath.expression": "//playlist/track",
"filters": "RenameArtists, SplitArtists",
"filters.RenameArtists.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter",
"filters.RenameArtists.field": "artist",
"filters.RenameArtists.target": "artists",
"filters.SplitArtists.type": "io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter",
"filters.SplitArtists.split": "artists",
"filters.SplitArtists.separator": ",",
"offset.strategy":"name",
"topic":"tracks-filepulse-xml-03",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Enfin, vérifions que nous obtenons bien le résultat attendu en exécutant :
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-xml-03 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"title": {
"string": "Theme From Jurassic Park"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
},
"artists": {
"array": [
{
"string": "John Williams"
},
{
"string": " London Symphony Orchestra"
}
]
}
}
Et voilà!
Conclusion
Nous avons vu dans cet article qu’il est relativement facile de charger un fichier XML dans Kafka sans écrire une seule ligne de code en utilisant le framework Kafka Connect. De plus, le connecteur Connect File Pulse est une solution complète qui permet de manipuler facilement vos données avant de les envoyer dans Apache Kafka.
N’hésitez pas à partager cet article. Si ce projet vous plaît, vous pouvez également nous soutenir en ajoutant une ⭐ au repositoy GitHub pour nous soutenir.