Streaming data into Kafka series: S01/E03 - Loading JSON file

Streaming data into Kafka series: S01/E03 - Loading JSON file
The Author's Avatar

Cet article est le troisième de la série “Streaming data into Kafka “. Dans les deux premiers, nous avons vu comment utiliser Kafka Connect pour charger des données depuis des fichiers CSV et XML dans Apache Kafka, sans avoir à écrire une seule ligne de code.

Pour ce faire, nous avons utilisé la solution Kafka Connect FilePulse, un connecteur Kafka Connect qui offre de nombreuses fonctionnalités intéressantes pour analyser et transformer les données.

Voyons maintenant comment intégrer des données JSON, un autre format de fichier très largement utilisé sur la plupart des projets (et beaucoup plus apprécié que le XML pour les applications web).

Kafka Connect File Pulse connector

Si vous avez déjà lu les articles précédents, passez directement à la section suivante (Ingestion de données).

Le connecteur Kafka Connect FilePulse facilite l’analyse, la transformation et l’envoi de données dans Kafka depuis des fichiers. Celui-ci permet de traiter nativement divers formats de fichiers tels que CSV, XML, JSON, AVRO, LOG4J, etc.

Pour une introduction du projet Kafka Connect FilePulse, je vous suggère de lire cet article :

Pour plus d’informations, vous pouvez aussi consulter la documentation ici.

Comment utiliser le connecteur

La façon la plus simple et la plus rapide de commencer avec Kafka Connect FilePulse est d’utiliser l’image Docker disponible sur Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:latest

Vous pouvez télécharger le fichier docker-compose.yml disponible sur le dépôt GitHub du projet pour démarrer rapidement une plate-forme Confluent avec Kafka Connect et le connecteur FilePulse préinstallé.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d

Une fois que tous les conteneurs Docker sont démarrés, vous pouvez vérifier que le connecteur est bien installé sur le worker Kafka Connect accessible sur http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse

"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"

Note : Vous pouvez également installer/télécharger le connecteur directement depuis le dépôt GitHub ou à partir de Confluent Hub.

Ingestion de données

Pour lire un fichier contenant un seul document JSON, nous utiliserons le BytesArrayInputReader. Ce Reader permet de créer un seul record par fichier source. Chaque record aura un unique champ de type byte[] nommé “message”. La valeur du champ correspond au contenu du fichier source (c’est-à-dire le document JSON).

Ensuite, pour parser le champ JSON, nous utiliserons le mécanisme Processing Filters fourni par le connecteur FilePulse et plus particulièrement le JSONFilter.

Nous pouvons créer le connecteur avec la configuration suivante :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "tasks.max": 1
    }'

Note: Kafka Connect FilePulse scanne périodiquement le répertoire d’entrée que nous définissons en utilisant la propriété fs.scan.directory.path. Ensuite, il recherchera les fichiers correspondant au modèle .*\\.json$. Chaque fichier est identifié et suivi de manière unique en fonction de propriété “offset.strategy”. Ici, la configuration précise qu’un fichier est identifié par son nom.

Créez un fichier JSON valide qui ressemble à ceci :

$ cat <<EOF > track.json
{ 
  "track": {
     "title":"Star Wars (Main Theme)",
     "artist":"John Williams, London Symphony Orchestra",
     "album":"Star Wars",
     "duration":"10:52"
  }
}
EOF

Copiez ensuite ce fichier dans le conteneur Docker qui exécute Kafka Connect avec les commandes suivantes :

// Create the target directory
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples

// Copy host file to docker-container
$ docker cp track.json connect://tmp/kafka-connect/examples/track-00.json

Enfin, consommez le Topic tracks-filepulse-json-00 et vérifiez que le connecteur a détecté et traité le fichier JSON :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "message": {
    "bytes": "{ \n  \"track\": {\n     \"title\":\"Star Wars (Main Theme)\",\n     \"artist\":\"John Williams, London Symphony Orchestra\",\n     \"album\":\"Star Wars\",\n     \"duration\":\"10:52\"\n  }\n}\n"
  },
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}

Note : Dans l’exemple ci-dessus, nous utilisons kafkacat pour consommer les messages. L’option o-1 est utilisée pour ne consommer que le dernier message disponible sur le topic.

Exclure un champ

Le JSONFilter ne supprime pas automatiquement le champ original contenant la chaîne JSON brute (c’est-à-dire le champ message). Si vous ne souhaitez pas conserver ce champ, vous pouvez le supprimer en utilisant le ExcludeFilter :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'

Copiez le fichier JSON dans le conteneur Docker comme précédemment :

$ docker cp track.json \
connect://tmp/kafka-connect/examples/track-01.json

Ensuite, consommez le Topic de sortie tracks-filepulse-json-01 en exécutant la commande suivante :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-01 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}

Et c’est tout ! Nous avons réussi à produire un message structuré propre, similaire à celui contenu dans notre fichier d’entrée.

Maintenant, essayons d’aller un peu plus loin.

Gestion des valeurs null

Parfois, vous devrez traiter des documents JSON dont certaines valeurs sont nulles. Par défaut, si nous prenons la configuration utilisée jusqu’à présent, les valeurs nulles seront ignorées lors de la sérialisation.

La principale raison est que le connecteur ne peut pas déduire le type d’un champ contenant une valeur “nulle”.

Cependant, il est possible de combiner le AppendFilter et une Simple Connect Expression Language (SCEL) pour définir le type de valeur nulle et fixer une valeur par défaut.

Note: Simple Connect Expression Language (SCEL) est un langage d’expression de base fourni par le connecteur Connect FilePulse pour accéder et manipuler les champs des records.

Créons un nouveau connecteur avec la configuration suivante:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.SetDefaultRank.field":"$value.track.rank",
        "filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '\''INTEGER'\'') }}",
        "filters.SetDefaultRank.overwrite": "true",
        "tasks.max": 1
    }'

Créons un second fichier JSON avec le contenu suivant:

$ cat <<EOF > track-with-null.json
{ 
  "track": {
     "title":"Duel of the Fates",
     "artist":"John Williams, London Symphony Orchestra",
     "album":"Star Wars",
     "duration":"4:14",
     "rank": null
  }
}
EOF

Puis, copions-le dans le conteneur Docker comme précédemment.

$ docker cp track-with-null.json \
connect://tmp/kafka-connect/examples/track-02.json

Enuiste, consommons le topic de sortie tracks-filepulse-json-01:

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-02 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Duel of the Fates"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "4:14"
      },
      "rank": {
        "int": 0
      }
    }
  }
}

Enfin, vous devriez obtenir un message de sortie contenant le champ rank de type int et initialisé avec la valeur par défaut 0.

Créer un record par élément dans un JSON Array

Enfin, il est également courant de devoir traiter des fichiers JSON contenant un ensemble d’objets JSON sous la forme d’un array.

Pour produire un record par élément dans un tableau, il est possible de configurer la propriété explode.array du JSONFilter à true.

Publions donc une nouvelle configuration en exécutant la commande suivante:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ParseJSON.explode.array":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'

Créons un fichier contenant deux objets JSON :

$ cat <<EOF > tracks.json
[
  {
    "track": {
      "title": "Star Wars (Main Theme)",
      "artist":  "John Williams, London Symphony Orchestra",
      "album": "Star Wars",
      "duration": "10:52"
    }
  },
  {
    "track": {
      "title":  "Duel of the Fates",
      "artist": "John Williams, London Symphony Orchestra",
      "album": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)",
      "duration": "4:14"
    }
  }
]
EOF

Puis, copions-le dans le conteneur Docker comme précédemment.

$ docker cp tracks.json \
connect://tmp/kafka-connect/examples/tracks-00.json

Ensuite, il ne nous reste plus qu’à consommer le topic de sortie tracks-filepulse-json-02 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-03 \
        -C -J -q -o0 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}
{
  "track": {
    "Track": {
      "title": {
        "string": "Duel of the Fates"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
      },
      "duration": {
        "string": "4:14"
      }
    }
  }
}

Et voilà! Vous savez maintenant comment traiter les fichiers JSON avec Kafka Connect.

Conclusion

Nous avons vu dans cet article qu’il est relativement facile de charger un fichier JSON dans Kafka sans écrire une seule ligne de code en utilisant le framework Kafka Connect. De plus, le connecteur Connect File Pulse est une solution complète qui permet de manipuler facilement vos données avant de les envoyer dans Apache Kafka.

N’hésitez pas à partager cet article. Si ce projet vous plaît, vous pouvez également nous soutenir en ajoutant une ⭐ au repositoy GitHub pour nous soutenir.

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.