Streaming data into Kafka series: S01/E01 - Loading CSV file

Streaming data into Kafka series: S01/E01 - Loading CSV file
The Author's Avatar

L’ingestion de fichiers de données dans Apache Kafka est une tâche très courante dès lors que l’on implémente une plateforme de streaming. Parmi les différents formats de fichiers que nous pouvons trouver, le format CSV est probablement le plus populaire pour déplacer des données d’un système à un autre. Cela est dû à sa simplicité et au fait qu’il peut être utilisé pour exporter ou importer rapidement des données d’une base de données à une autre.

Un fichier CSV n’est rien d’autre qu’un fichier texte (avec une extension “.csv”). Chaque ligne du fichier représente un enregistrement de données et chaque enregistrement est constitué d’un ou plusieurs champs, séparés par une virgule (ou par un autre séparateur).

Voici un exemple de fichier:

40;War;02:38;1983;U2;Rock
Acrobat;Achtung Baby;04:30;1991;U2;Rock
Sunday Bloody Sunday;War;04:39;1983;U2;Rock
With or Without You;The Joshua Tree;04:55;1987; U2;Rock

De plus, un fichier CSV peut contenir une ligne d’en-tête pour indiquer le nom de chaque champ.

title;album;duration;release;artist;type

Dans cet article, nous verrons comment intégrer un tel fichier dans Apache Kafka. Bien sûr, nous ne réinventerons pas la roue pour cela, de nombreux outils existent déjà et sont disponibles en open-source.

Nous utiliserons donc le framework Kafka Connect, qui fait partie du projet Apache Kafka. Kafka Connect a été conçu pour transférer des données depuis et vers Kafka via l’utilisation de connecteurs (https://www.confluent.io/hub/)

Kafka Connect File Pulse connector

Le connecteur Kafka Connect FilePulse facilite l’analyse, la transformation et l’envoi de données dans Kafka depuis des fichiers. Celui-ci permet de traiter divers formats de fichiers tels que CSV, XML, JSON, AVRO, LOG4J, etc. Ici, nous nous concentrerons uniquement sur le CSV.

Pour une introduction du projet Kafka Connect FilePulse, je vous suggère de lire cet article :

Pour plus d’informations, vous pouvez aussi consulter la documentation ici.

Comment utiliser le connecteur

La façon la plus simple et la plus rapide de commencer avec Kafka Connect FilePulse est d’utiliser l’image Docker disponible sur Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:latest

Vous pouvez télécharger le fichier docker-compose.yml disponible sur le dépôt GitHub du projet pour démarrer rapidement une plate-forme Confluent avec Kafka Connect et le connecteur FilePulse préinstallé.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d

Une fois que tous les conteneurs Docker sont démarrés, vous pouvez vérifier que le connecteur est bien installé sur le worker Kafka Connect accessible sur http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse

"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"

Note : Vous pouvez également installer/télécharger le connecteur directement depuis le dépôt GitHub ou à partir de Confluent Hub.

Ingestion de données

Créons dès à présent un premier connecteur en exécutant la commande suivante:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'

Ensuite, nous pouvons rapidement vérifier que le connecteur est correctement démarré via la commande suivante:

$ curl -s localhost:8083/connectors/source-csv-filepulse-00/status|jq  '.connector.state'

"RUNNING"

Pour l’instant, notre connecteur ne traite aucune donnée. Copions un fichier CSV dans le répertoire d’entrée /tmp/kafka-connect/examples/:

// Download CSV file
$ export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/
$ curl -sSL $GITHUB_REPO_MASTER/examples/quickstart-musics-dataset.csv -o musics-dataset.csv

// Copy CSV file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
$ docker cp musics-dataset.csv connect://tmp/kafka-connect/examples/musics-dataset-00.csv

Puis, vérifions que des données ont été produites dans le topic de sortie musics-filepulse-csv-00 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "message": {
    "string": "Zoo Station;Achtung Baby;04:36;1991;U2;Rock"
  },
  "headers": {
    "array": [
      {
        "string": "title;album;duration;release;artist;type"
      }
    ]
  }
}

Note : Dans l’exemple ci-dessus, nous utilisons kafkacat pour consommer les messages. L’option o-1 est utilisée pour ne consommer que le dernier message disponible sur le topic.

Comme nous pouvons le voir, le topic contient un message Avro pour chaque ligne du fichier CSV. La raison à cela est que nous avons configuré notre connecteur pour utiliser le RowFileInputReader (cf. task.reader.class).

Chaque record contient deux champs :

  • message: Ce champ est de type string et représente une seule ligne du fichier d’entrée.

  • headers: Ce champ est de type array et contient la première ligne d’en-tête du fichier d’entrée. Ce champ est automatiquement ajouté par le RowFileInputReader dès lors que vous configurez la propriété skip.headers=1.

Headers

Kafka Connect FilePulse ajoute automatiquement, à chaque record, des headers Kafka contenant des métadonnées sur le fichier source depuis lequel les données ont été extraites. C’est métadonées peuvent-être utiles pour le débogage mais aussi pour le data-lineage.

$ docker run --tty  --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .headers
[
  "connect.file.name",
  "musics-dataset-00.csv",
  "connect.file.path",
  "/tmp/kafka-connect/examples",
  "connect.file.hash",
  "1466679696",
  "connect.file.size",
  "6588",
  "connect.file.lastModified",
  "1597337097000",
  "connect.hostname",
  "57a2fb6213f9"
]

L’analyse des données

Kafka Connect FilePulse permet de définir des pipelines complexes pour analyser, transformer et enrichir les données grâce à l’utilisation de filtres de traitement

Pour parser chacune des lignes de notre fichier, nous pouvez utiliser le filtre DelimitedRowFilter. De plus, comme la première ligne du fichier CSV est une en-tête, il est possible configurer la propriété extractColumnName afin de nommer automatiqument les champs des record à partir du champ “headers”.

Créons un nouveau connecteur avec la configuration suivante:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-01/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
    }'

Enfin, consommons les messages du topic musics-filepulse-csv-01 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
    -b localhost:9092 \
    -t musics-filepulse-csv-01 \
    -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "title": {
    "string": "Zoo Station"
  },
  "album": {
    "string": "Achtung Baby"
  },
  "duration": {
    "string": "04:36"
  },
  "release": {
    "string": "1991"
  },
  "artist": {
    "string": "U2"
  },
  "type": {
    "string": "Rock"
  }
}

Filtrage des données

Parfois, vous pouvez souhaiter ne conserver que les lignes d’un fichier qui comportent un champ avec une valeur spécifique. Pour cet article, imaginons que l’on ne souhaite garder que les chansons du groupe AC/DC.

Pour ce faire, nous pouvons étendre notre chaîne de filtrage pour utiliser le filtre DropFilter qui, comme son nom le suggère, permet de supprimer des records en fonction d’une condition donnée.

Créons un nouveau connecteur (afin que notre fichier puisse être retraité) :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine,KeepACDC",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
        "filters.KeepACDC.if":"{{ equals($value.artist, '\''AC/DC'\'') }}",
        "filters.KeepACDC.invert":"true"
    }'

La propriété if est définie avec une expression SCEL (Simple Connect Expression Language) qui est un langage d’expression défini par Kafka Connect FilePulse pour accéder et manipuler les champs d’un record.

Enfin, vérifions que nous obtenons bien le résultat attendu en exécutant :

$ docker run --tty  --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-02 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault.artist

{
  "string": "AC/DC"
}

Changer le type d’un champ

Vous aurez probablement remarqué que nous n’avons défini, à aucun moment, le type des champs (c.à.d des colonnes CSV). Par défaut, le connecteur suppose que tous les champs sont de type chaîne de caractères (String). Peut-être serez-vous satisfait de cela, mais dans la plupart des cas, il est fort à parier que vous souhaiterez convertir un ou plusieurs champs. Par exemple, nous pourrions vouloir typer le champ release en Integer.

Pour cela, nous pouvons utiliser le filtre AppendFilter combiné avec une Simple Connect Expression.

Créons la configuration suivante :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-03/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine,KeepACDC,ReleaseToInt",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
        "filters.KeepACDC.if":"{{ equals($value.artist, '\''AC/DC'\'') }}",
        "filters.KeepACDC.invert":"true",
        "filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.ReleaseToInt.field": "$value.release",
        "filters.ReleaseToInt.value": "{{ converts($value.release, '\''INTEGER'\'') }}",
        "filters.ReleaseToInt.overwrite": "true"
    }'

Vérifions, ensuite, que le champs a bien été converti en type int.

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-03 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault.release

{
  "release": {
    "int": 1980
  }
}

AppendFilter est un filtre très pratique qui permet de modifier rapidement un record. Par exemple, nous pourrions également l’utiliser pour définir la clé de chaque record comme étant le nom de la l’album en ajoutant la configuration suivante :

"filters.SetKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.field": "$key",
"filters.SetKey.value": "{{ uppercase($value.album)}}"

Note : Pour cet article, nous avons utilisé ici le mécanisme de la chaîne de filtrage fourni par le connecteur Connect File Pulse. Mais il est également possible d’utiliser les Transformations de Message Unique (SMT) de Kafka Connect pour effectuer la même tâche en utilisant le org.apache.kafka.connect.transforms.Cast.

Conclusion

Nous avons vu dans cet article qu’il est relativement facile de charger un fichier CSV dans Kafka sans écrire une seule ligne de code en utilisant le framework Kafka Connect. De plus, le connecteur Connect File Pulse est une solution complète qui permet de manipuler facilement vos données avant de les envoyer dans Apache Kafka.

N’hésitez pas à partager cet article. Si ce projet vous plaît, vous pouvez également nous soutenir en ajoutant une ⭐ au repositoy GitHub pour nous soutenir.

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.