Streaming data into Kafka series: S01/E03 - Loading JSON file

Streaming data into Kafka series: S01/E03 - Loading JSON file
The Author's Avatar

Cet article est le troisième de la série “Streaming data into Kafka “. Dans les deux premiers, nous avons vu comment utiliser Kafka Connect pour charger des données depuis des fichiers CSV et XML dans Apache Kafka, sans avoir à écrire une seule ligne de code.

Pour ce faire, nous avons utilisé la solution Kafka Connect FilePulse, un connecteur Kafka Connect qui offre de nombreuses fonctionnalités intéressantes pour analyser et transformer les données.

Voyons maintenant comment intégrer des données JSON, un autre format de fichier très largement utilisé sur la plupart des projets (et beaucoup plus apprécié que le XML pour les applications web).

Kafka Connect File Pulse connector

Si vous avez déjà lu les articles précédents, passez directement à la section suivante (Ingestion de données).

Le connecteur Kafka Connect FilePulse facilite l’analyse, la transformation et l’envoi de données dans Kafka depuis des fichiers. Celui-ci permet de traiter nativement divers formats de fichiers tels que CSV, XML, JSON, AVRO, LOG4J, etc.

Pour une introduction du projet Kafka Connect FilePulse, je vous suggère de lire cet article :

Pour plus d’informations, vous pouvez aussi consulter la documentation ici.

Comment utiliser le connecteur

La façon la plus simple et la plus rapide de commencer avec Kafka Connect FilePulse est d’utiliser l’image Docker disponible sur Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:latest

Vous pouvez télécharger le fichier docker-compose.yml disponible sur le dépôt GitHub du projet pour démarrer rapidement une plate-forme Confluent avec Kafka Connect et le connecteur FilePulse préinstallé.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d

Une fois que tous les conteneurs Docker sont démarrés, vous pouvez vérifier que le connecteur est bien installé sur le worker Kafka Connect accessible sur http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse

"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"

Note : Vous pouvez également installer/télécharger le connecteur directement depuis le dépôt GitHub ou à partir de Confluent Hub.

Ingestion de données

To read a file containing a single JSON document, we will use the BytesArrayInputReader. This reader allows us to create a single record per source file. Each record produced by this reader will have a single field of type byte[] named message. The byte[] value is the full content of the source file (i.e the JSON document).

Then, to parse this field, we will use the processing filter mechanism provided by the FilePulse connector and more particularly the JSONFilter.

So lets’s create the connector with this minimal configuration:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "tasks.max": 1
    }'

Note: Kafka Connect FilePulse scanne périodiquement le répertoire d’entrée que nous définissons en utilisant la propriété fs.scan.directory.path. Ensuite, il recherchera les fichiers correspondant au modèle .*\\.json$. Chaque fichier est identifié et suivi de manière unique en fonction de propriété “offset.strategy”. Ici, la configuration précise qu’un fichier est identifié par son nom.

Create a valid JSON file that looks like this:

$ cat <<EOF > track.json
{ 
  "track": {
     "title":"Star Wars (Main Theme)",
     "artist":"John Williams, London Symphony Orchestra",
     "album":"Star Wars",
     "duration":"10:52"
  }
}
EOF

Then copy this file from your host to the Docker container which runs the connector. You can run the following commands:

// Create the target directory
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples

// Copy host file to docker-container
$ docker cp track.json connect://tmp/kafka-connect/examples/track-00.json

Finally, consume the topic named tracks-filepulse-json-00 and verify that the connector has detected and processed the JSON file:

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "message": {
    "bytes": "{ \n  \"track\": {\n     \"title\":\"Star Wars (Main Theme)\",\n     \"artist\":\"John Williams, London Symphony Orchestra\",\n     \"album\":\"Star Wars\",\n     \"duration\":\"10:52\"\n  }\n}\n"
  },
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}

Note : Dans l’exemple ci-dessus, nous utilisons kafkacat pour consommer les messages. L’option o-1 est utilisée pour ne consommer que le dernier message disponible sur le topic.

Exclure un champ

The JSONFilter does not automatically delete the original field containing the raw JSON string (i.e. the message). If you do not want to keep this field, you can remove it using the ExcludeFilter as follows:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'

Copy the JSON file to the Docker container as previously:

$ docker cp track.json \
connect://tmp/kafka-connect/examples/track-01.json

Then consume the output topic tracks-filepulse-json-01 by running :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-01 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}

That’s it! We have successfully produced a clean structured message similar to the one contained in our input file.

Now, let’s go a step further.

Gestion des valeurs null

Sometimes you may have to process JSON documents with null values. By default, if we take the configuration used so far, the null values will be ignored during the serialization.

The main reason for this is that the connector cannot infer the type of a field containing a null value.

However, we can combine the AppendFilter and the Simple Connect Expression Language (SCEL) to both define the type of null value and set a default value.

Note: Simple Connect Expression Language (SCEL) is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields.

Créons un nouveau connecteur avec la configuration suivante:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.SetDefaultRank.field":"$value.track.rank",
        "filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '\''INTEGER'\'') }}",
        "filters.SetDefaultRank.overwrite": "true",
        "tasks.max": 1
    }'

Créons un second fichier JSON avec le contenu suivant:

$ cat <<EOF > track-with-null.json
{ 
  "track": {
     "title":"Duel of the Fates",
     "artist":"John Williams, London Symphony Orchestra",
     "album":"Star Wars",
     "duration":"4:14",
     "rank": null
  }
}
EOF

Puis, copions-le dans le conteneur Docker comme précédemment.

$ docker cp track-with-null.json \
connect://tmp/kafka-connect/examples/track-02.json

Enuiste, consommons le topic de sortie tracks-filepulse-json-01:

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-02 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Duel of the Fates"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "4:14"
      },
      "rank": {
        "int": 0
      }
    }
  }
}

Enfin, vous devriez obtenir un message de sortie contenant le champ rank de type int et initialisé avec la valeur par défaut 0.

Créer un record par élément dans un JSON Array

Enfin, il est également courant de devoir traiter des fichiers JSON contenant un ensemble d’objets JSON sous la forme d’un array.

Pour produire un record par élément dans un tableau, il est possible de configurer la propriété explode.array du JSONFilter à true.

Publions donc une nouvelle configuration en exécutant la commande suivante:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"tracks-filepulse-json-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ParseJSON.explode.array":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'

Créons un fichier contenant deux objets JSON :

$ cat <<EOF > tracks.json
[
  {
    "track": {
      "title": "Star Wars (Main Theme)",
      "artist":  "John Williams, London Symphony Orchestra",
      "album": "Star Wars",
      "duration": "10:52"
    }
  },
  {
    "track": {
      "title":  "Duel of the Fates",
      "artist": "John Williams, London Symphony Orchestra",
      "album": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)",
      "duration": "4:14"
    }
  }
]
EOF

Puis, copions-le dans le conteneur Docker comme précédemment.

$ docker cp tracks.json \
connect://tmp/kafka-connect/examples/tracks-00.json

Ensuite, il ne nous reste plus qu’à consommer le topic de sortie tracks-filepulse-json-02 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-03 \
        -C -J -q -o0 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}
{
  "track": {
    "Track": {
      "title": {
        "string": "Duel of the Fates"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
      },
      "duration": {
        "string": "4:14"
      }
    }
  }
}

Et voilà! Vous savez maintenant comment traiter les fichiers JSON avec Kafka Connect.

Conclusion

Nous avons vu dans cet article qu’il est relativement facile de charger un fichier JSON dans Kafka sans écrire une seule ligne de code en utilisant le framework Kafka Connect. De plus, le connecteur Connect File Pulse est une solution complète qui permet de manipuler facilement vos données avant de les envoyer dans Apache Kafka.

N’hésitez pas à partager cet article. Si ce projet vous plaît, vous pouvez également nous soutenir en ajoutant une ⭐ au repositoy GitHub pour nous soutenir.

The Author's Avatar
écrit par

Florian est Co-fondateur et CEO de StreamThoughts. Passionné par les systèmes distribués, il se spécialise depuis les technologies d’event-streaming comme Apache Kafka. Aujourd’hui, il accompagne les entreprises dans leur transition vers les architectures orientées streaming d’événements. Florian a été nommé Confluent Community Catalyst pour les années 2019/2021. Il est contributeur sur le projet Apache Kafka Streams et fait partie des organisateurs du Paris Apache Kafka Meetup.