{"id":5857,"date":"2026-02-18T18:28:10","date_gmt":"2026-02-18T18:28:10","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5857"},"modified":"2026-03-14T21:02:17","modified_gmt":"2026-03-14T21:02:17","slug":"processing-json-with-kafka-connect","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5857","title":{"rendered":"Processing JSON with Kafka Connect"},"content":{"rendered":"<p><strong>In this post, I&#8217;ll share examples of how to process JSON data in a Kafka Connect pipeline, and explain the schema format that Kafka uses to describe JSON events.<\/strong>\u00a0<\/p>\n<h3>Using sink connectors<\/h3>\n<p>Kafka Connect sink connectors let you send the events on your Kafka topics\u00a0to external systems.\u00a0I&#8217;ve <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5052\">talked about this before<\/a>, but to recap the structure looks a bit like this:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-02-18-json\/01-connect.png\" style=\"width: 100%; max-width: 650px; border: thin #ccc solid;\"\/><\/p>\n<p>Imagine that you have this JSON event on a Kafka topic.\u00a0<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 4px;\">{\n    \"id\": 12345678,\n    \"message\": \"Hello World\",\n    \"isDemo\": true\n}<\/pre>\n<p>How should you configure Kafka Connect to send that somewhere?\u00a0<\/p>\n<p>It depends&#8230;<\/p>\n<p><!--more--><\/p>\n<h3>Unstructured sink connectors<\/h3>\n<p>If you&#8217;re sending the event to a sink that doesn&#8217;t require structure, you could simply send use <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">StringConverter<\/code>. The JSON event won&#8217;t be parsed or interpreted; it will just be sent as a string.\u00a0<\/p>\n<p>The simplest example of this is the demo file connector <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">FileStreamSinkConnector<\/code>.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-02-18-json\/02-file-sink.png?raw=true\" style=\"width: 100%; max-width: 650px; border: thin #ccc solid;\"\/><\/p>\n<p>With config like this, the Connector can receive each new event from the Kafka topic and\u00a0append it to a text file as a new (string) line.\u00a0<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 4px;\">value.converter=org.apache.kafka.connect.storage.StringConverter\nvalue.converter.schemas.enable=false\n\nconnector.class=FileStreamSink\nfile=mydemofile.jsonl<\/pre>\n<h3>Structured sink connectors<\/h3>\n<p>What if you&#8217;re sending the event to a sink that does require structured data?<\/p>\n<p>An example of this would be a database connectors that inserts events into a PostgreSQL database.\u00a0<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-02-18-json\/03-db-sink.png?raw=true\" style=\"width: 100%; max-width: 650px; border: thin #ccc solid;\"\/><\/p>\n<p>If you&#8217;re sending the event to a sink that requires structure, you can use <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">JsonConverter<\/code>.\u00a0<\/p>\n<p>There are other things that schemas can help with. For example, even if you&#8217;re using Kafka Connect with a sink that accepts unstructured data, if you want to use any transforms to process the events first, you&#8217;ll likely need to parse the raw JSON. Single Message Transforms are less useful when they only have raw string payloads to work with.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-02-18-json\/04-smt.png?raw=true\" style=\"width: 100%; max-width: 650px; border: thin #ccc solid;\"\/><\/p>\n<h4>Introducing schemas for JSON events<\/h4>\n<p><strong>But<\/strong> to help with either of these, you need a way to tell JsonConverter about the structure of the data.\u00a0<\/p>\n<p>You might think that you could infer the structure from the event data, but that would just be guessing. If the structure is significant to the sink that receives the data, guesses are not enough &#8211; you won&#8217;t know about any missing optional values that have defaults, you won&#8217;t be able to distinguish between ambiguous numerical types, and so on. A schema is needed to fill in these missing gaps.\u00a0<\/p>\n<p>Kafka Connect uses its own JSON schema format, that is <strong>different<\/strong> from <a href=\"https:\/\/json-schema.org\/\">the IETF standard<\/a> we normally mean when we say &#8220;JSON schema&#8221;.\u00a0<\/p>\n<h3>Demo<\/h3>\n<p>I&#8217;ll start with a simple example, and then step back and fill in some details.<\/p>\n<p>For this example, I&#8217;ll use <a href=\"https:\/\/github.com\/Aiven-Open\/jdbc-connector-for-apache-kafka\">a database sink connector<\/a> as an example of the sort of connector where data structure is essential.\u00a0<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-02-18-json\/03-db-sink.png?raw=true\" style=\"width: 100%; max-width: 650px; border: thin #ccc solid;\"\/><\/p>\n<p>Imagine that you have this JSON event on a Kafka topic that the connector should insert into a PostgreSQL database.\u00a0<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">{\n    \"id\": \"00000000-0000-0000-0000-000000000000\",\n\n    \"example_integer_1\": 0,\n    \"example_integer_2\": 0,\n    \"example_integer_3\": 0,\n    \"example_integer_4\": 0,\n\n    \"example_decimal_1\": 0.0,\n    \"example_decimal_2\": 0.0,\n\n    \"example_boolflag\": false,\n\n    \"example_bytes\": \"\",\n\n    \"example_array_ints\": [ ],\n\n    \"example_temporal_1\": 0,\n    \"example_temporal_2\": 0,\n    \"example_temporal_3\": 0,\n\n    \"example_optional_1\": \"\",\n    \"example_optional_2\": \"\"\n}<\/pre>\n<p>A schema can describe each of the properties:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">{\n    \"type\": \"struct\",\n    \"fields\": [\n        {\n            \"field\": \"id\",\n            \"type\": \"string\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_integer_1\",\n            \"type\": \"int8\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_integer_2\",\n            \"type\": \"int16\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_integer_3\",\n            \"type\": \"int32\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_integer_4\",\n            \"type\": \"int64\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_decimal_1\",\n            \"type\": \"float\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_decimal_2\",\n            \"type\": \"double\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_boolflag\",\n            \"type\": \"boolean\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_bytes\",\n            \"type\": \"bytes\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_array_ints\",\n            \"type\": \"array\",\n            \"items\": {\n                \"type\": \"int32\",\n                \"optional\": false\n            }\n        },\n        {\n            \"field\": \"example_temporal_1\",\n            \"type\": \"int32\",\n            \"name\": \"org.apache.kafka.connect.data.Date\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_temporal_2\",\n            \"type\": \"int32\",\n            \"name\": \"org.apache.kafka.connect.data.Time\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_temporal_3\",\n            \"type\": \"int64\",\n            \"name\": \"org.apache.kafka.connect.data.Timestamp\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"example_optional_1\",\n            \"type\": \"string\",\n            \"optional\": true\n        },\n        {\n            \"field\": \"example_optional_2\",\n            \"type\": \"string\",\n            \"optional\": true,\n            \"default\": \"default_value_if_not_provided\"\n        }\n    ]\n}<\/pre>\n<p>You can set up your sink connector like this, using the <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">JsonConverter<\/code> and the location of the schema.json file.<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 150px;\">value.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=true\nvalue.converter.schema.content=${dirProvider:\/Users\/dalelane\/demos\/kafka-connect-json:schema.json}\n\nconnector.class=io.aiven.connect.jdbc.JdbcSinkConnector\n\n#\n# database connector config - not specific to the JSON converter\nconnection.url=jdbc:postgresql:\/\/localhost:5432\/dalelane\nconnection.user=dalelane\n# connection.password - my local dev database doesn't need a password, but your database probably does\ntable.name.format=public.demotable\ninsert.mode=upsert\nauto.create=true\npk.mode=record_value\npk.fields=id<\/pre>\n<p>Start up Connect, and you&#8217;ll see your PostgreSQL database has a new table called demotable.<\/p>\n<p>The JsonConverter has given the sink connector enough detail about the events on the topic for it to create a\u00a0table with the appropriate columns.<\/p>\n<p>Notice how the different column types are used, in a way that would be impossible to do from the event JSON alone.\u00a0<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">dalelane=# \\d demotable\n                                         Table \"public.demotable\"\n       Column       |            Type             | Collation | Nullable |                Default\n--------------------+-----------------------------+-----------+----------+---------------------------------------\n id                 | text                        |           | not null |\n example_integer_1  | smallint                    |           | not null |\n example_integer_2  | smallint                    |           | not null |\n example_integer_3  | integer                     |           | not null |\n example_integer_4  | bigint                      |           | not null |\n example_decimal_1  | real                        |           | not null |\n example_decimal_2  | double precision            |           | not null |\n example_boolflag   | boolean                     |           | not null |\n example_bytes      | bytea                       |           | not null |\n example_array_ints | integer[]                   |           | not null |\n example_temporal_1 | date                        |           | not null |\n example_temporal_2 | time without time zone      |           | not null |\n example_temporal_3 | timestamp without time zone |           | not null |\n example_optional_1 | text                        |           |          |\n example_optional_2 | text                        |           |          | 'default_value_if_not_provided'::text\nIndexes:\n    \"demotable_pkey\" PRIMARY KEY, btree (id)\n<\/pre>\n<p>And a row representing the JSON event from the Kafka topic is there:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">dalelane=# select id, example_integer_1, example_integer_2, example_integer_3, example_integer_4 from demotable;\n                  id                  | example_integer_1 | example_integer_2 | example_integer_3 | example_integer_4\n--------------------------------------+-------------------+-------------------+-------------------+--------------------\n 00000000-0000-0000-0000-000000000000 |                 0 |                 0 |                 0 |                  0\n(1 row)\n\ndalelane=# select id, example_decimal_1, example_decimal_2 from demotable;\n                  id                  | example_decimal_1 | example_decimal_2\n--------------------------------------+-------------------+-------------------\n 00000000-0000-0000-0000-000000000000 |                 0 |                 0\n(1 row)\n\ndalelane=# select id, example_boolflag, example_bytes, example_array_ints from demotable;\n                  id                  | example_boolflag |    example_bytes     |  example_array_ints\n--------------------------------------+------------------+----------------------+-----------------------\n 00000000-0000-0000-0000-000000000000 | f                | \\x                   | {}\n(1 row)\n\ndalelane=# select id, example_temporal_1, example_temporal_2, example_temporal_3 from demotable;\n                  id                  | example_temporal_1 | example_temporal_2 | example_temporal_3\n--------------------------------------+--------------------+--------------------+---------------------\n 00000000-0000-0000-0000-000000000000 | 1970-01-01         | 00:00:00           | 1970-01-01 00:00:00\n(1 row)\n\ndalelane=# select id, example_optional_1, example_optional_2 from demotable;\n                  id                  | example_optional_1 |      example_optional_2\n--------------------------------------+--------------------+-------------------------------\n 00000000-0000-0000-0000-000000000000 |                    | default_value_if_not_provided\n(1 row)\n<\/pre>\n<p>To show another example, imagine you produce this second JSON event to the Kafka topic:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">{\n    \"example_decimal_1\": 3.1415927,\n    \"example_decimal_2\": 3.141592653589793,\n\n    \"example_boolflag\": true,\n\n    \"example_array_ints\": [ 2, 3, 5, 7, 11, 13, 17, 19 ],\n\n    \"example_integer_4\": 473892472947371246,\n    \"example_integer_3\": 1194727881,\n    \"example_integer_2\": 20472,\n    \"example_integer_1\": 12,\n\n    \"example_bytes\": \"ZGFsZS1sYW5l\",\n\n    \"example_temporal_1\": 3776,\n    \"example_temporal_3\": 1777885200000,\n    \"example_temporal_2\": 43200000,\n\n    \"id\": \"9D51DCBE-E2F1-47DA-B341-DAC8369C8451\"\n}<\/pre>\n<p>The connector will insert this new event data into the DB table:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">dalelane=# select id, example_integer_1, example_integer_2, example_integer_3, example_integer_4 from demotable;\n                  id                  | example_integer_1 | example_integer_2 | example_integer_3 | example_integer_4\n--------------------------------------+-------------------+-------------------+-------------------+--------------------\n 00000000-0000-0000-0000-000000000000 |                 0 |                 0 |                 0 |                  0\n 9D51DCBE-E2F1-47DA-B341-DAC8369C8451 |                12 |             20472 |        1194727881 | 473892472947371246\n(2 rows)\n\ndalelane=# select id, example_decimal_1, example_decimal_2 from demotable;\n                  id                  | example_decimal_1 | example_decimal_2\n--------------------------------------+-------------------+-------------------\n 00000000-0000-0000-0000-000000000000 |                 0 |                 0\n 9D51DCBE-E2F1-47DA-B341-DAC8369C8451 |         3.1415927 | 3.141592653589793\n(2 rows)\n\ndalelane=# select id, example_boolflag, example_bytes, example_array_ints from demotable;\n                  id                  | example_boolflag |    example_bytes     |  example_array_ints\n--------------------------------------+------------------+----------------------+-----------------------\n 00000000-0000-0000-0000-000000000000 | f                | \\x                   | {}\n 9D51DCBE-E2F1-47DA-B341-DAC8369C8451 | t                | \\x64616c652d6c616e65 | {2,3,5,7,11,13,17,19}\n(2 rows)\n\ndalelane=# select id, example_temporal_1, example_temporal_2, example_temporal_3 from demotable;\n                  id                  | example_temporal_1 | example_temporal_2 | example_temporal_3\n--------------------------------------+--------------------+--------------------+---------------------\n 00000000-0000-0000-0000-000000000000 | 1970-01-01         | 00:00:00           | 1970-01-01 00:00:00\n 9D51DCBE-E2F1-47DA-B341-DAC8369C8451 | 1980-05-04         | 12:00:00           | 2026-05-04 09:00:00\n(2 rows)\n\ndalelane=# select id, example_optional_1, example_optional_2 from demotable;\n                  id                  | example_optional_1 |      example_optional_2\n--------------------------------------+--------------------+-------------------------------\n 00000000-0000-0000-0000-000000000000 |                    | default_value_if_not_provided\n 9D51DCBE-E2F1-47DA-B341-DAC8369C8451 |                    | default_value_if_not_provided\n(2 rows)\n<\/pre>\n<p>Notice some of the benefits the schema brought, even beyond getting this to work at all.<\/p>\n<p>For example, the database engine added the correct default value for the example_optional_2 property that was missing from the Kafka event &#8211; as defined in the schema.\u00a0<\/p>\n<p>And notice that even though the ordering of the properties in the JSON payload was different in this second event, the correct values were inserted into the correct columns &#8211; because this is a structured event payload, not a single string.<\/p>\n<h3>Writing a schema for JsonConverter<\/h3>\n<p>The example schema above is probably enough to give you an idea, as I intentionally included a lot of different types. I&#8217;ll summarise the basics below.<\/p>\n<p>Kafka Connect supports these primitive types:<\/p>\n<ul>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">int8<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">int16<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">int32<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">int64<\/code> &#8211; signed integers of various sizes<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">float<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">double<\/code> &#8211; floating-point numbers<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">boolean<\/code> &#8211; true \/ false<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">string<\/code> &#8211; text data<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">bytes<\/code> &#8211; binary data<\/li>\n<\/ul>\n<p>(<em>I&#8217;ve never found canonical documentation for this, so I <a href=\"https:\/\/github.com\/apache\/kafka\/blob\/trunk\/connect\/json\/src\/main\/java\/org\/apache\/kafka\/connect\/json\/JsonSchema.java#L39-L59\">refer to the code<\/a> when I need to remind myself of these type names.<\/em>)<\/p>\n<p>There are also logical types (which are primitive types with defined semantic meaning):<\/p>\n<ul>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">Decimal<\/code> &#8211; arbitrary precision decimals (<a href=\"https:\/\/github.com\/apache\/kafka\/blob\/5f59eac11be1d8201795695f41c8e18983be1952\/connect\/api\/src\/main\/java\/org\/apache\/kafka\/connect\/data\/Decimal.java#L26-L30\">details<\/a>)<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">Date<\/code> &#8211; days since Unix epoch (<a href=\"https:\/\/github.com\/apache\/kafka\/blob\/5f59eac11be1d8201795695f41c8e18983be1952\/connect\/api\/src\/main\/java\/org\/apache\/kafka\/connect\/data\/Date.java#L26-L29\">details<\/a>)<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">Time<\/code> &#8211; milliseconds since midnight (<a href=\"https:\/\/github.com\/apache\/kafka\/blob\/5f59eac11be1d8201795695f41c8e18983be1952\/connect\/api\/src\/main\/java\/org\/apache\/kafka\/connect\/data\/Time.java#L26-L29\">details<\/a>)<\/li>\n<li><code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">Timestamp<\/code> &#8211; milliseconds since Unix epoch (<a href=\"https:\/\/github.com\/apache\/kafka\/blob\/5f59eac11be1d8201795695f41c8e18983be1952\/connect\/api\/src\/main\/java\/org\/apache\/kafka\/connect\/data\/Timestamp.java#L23-L24\">details<\/a>)<\/li>\n<\/ul>\n<p>You can see examples of how to use them in context of a schema in my example schema above.<\/p>\n<p>(<em>Again, I&#8217;ve never found canonical documentation for this, but the class comments in the implementations have a good description of how to use each type. I&#8217;ve included links to the code where each comment lives in my list above.<\/em>)<\/p>\n<p>A complex object is represented as a <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">struct<\/code>, which is a collection of named fields, with support for optional\/required fields and default values.<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">{\n    \"type\": \"struct\",\n    \"fields\": [\n        {\n            \"field\": \"id\",\n            \"type\": \"int32\",\n            \"optional\": false\n        },\n        {\n            \"field\": \"name\",\n            \"type\": \"string\",\n            \"optional\": true,\n            \"default\": \"Adam\"\n        }\n    ],\n    \"optional\": false\n}<\/pre>\n<p>Arrays look like this &#8211; which can be arrays of primitives, or arrays of structs:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 350px;\">{\n    \"type\": \"struct\",\n    \"fields\": [\n        {\n            \"field\": \"simple\",\n            \"type\": \"array\",\n            \"items\": {\n                \"type\": \"string\"\n            }\n        },\n        {\n            \"field\": \"complex\",\n            \"type\": \"array\",\n            \"items\": {\n                \"type\": \"struct\",\n                \"fields\": [\n                    {\n                        \"field\": \"id\",\n                        \"type\": \"int32\",\n                        \"optional\": false\n                    },\n                    {\n                        \"field\": \"name\",\n                        \"type\": \"string\",\n                        \"optional\": true\n                    }\n                ]\n            }\n        }\n    ]\n}<\/pre>\n<p>That would describe events such as:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 200px;\">{\n    \"simple\": [\n        \"apple\",\n        \"banana\"\n    ],\n    \"complex\": [\n        {\n            \"id\": 123\n        },\n        {\n            \"id\": 456,\n            \"name\": \"Bob\"\n        }\n    ]\n}<\/pre>\n<p>Maps in Connect perhaps don&#8217;t translate as obviously to JSON data, as Connect schemas allow you to use arbitrary complex types for both the key and value.<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 200px;\">{\n    \"type\": \"struct\",\n    \"fields\": [\n        {\n            \"field\": \"my_map\",\n            \"type\": \"map\",\n            \"keys\": {\n                \"type\": \"string\",\n                \"optional\": false\n            },\n            \"values\": {\n                \"type\": \"int32\",\n                \"optional\": false\n            }\n        }\n    ]\n}<\/pre>\n<p>That would describe events such as:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 200px;\">{\n    \"my_map\": {\n        \"something\": 123,\n        \"something_else\": 456\n    }\n}<\/pre>\n<p>(<em>I&#8217;ve never found a sensible reason to use anything other than strings for map keys. YMMV.<\/em>)<\/p>\n<h3>Comparing JsonConverter schemas with JSON Schema<\/h3>\n<p>I mentioned above that Kafka&#8217;s schema format is different to <a href=\"https:\/\/json-schema.org\/\">JSON Schema<\/a> (the IETF specification). This often trips people up, so it&#8217;s perhaps worth a few comments on how they compare.<\/p>\n<p>If you&#8217;re used to JSON Schema, you might notice in my description above that there are a few things you can&#8217;t easily do. I won&#8217;t try and write an exhaustive comparison, but the most obvious <strong>things that you can&#8217;t express in Kafka that I would do in JSON Schema<\/strong> are:<\/p>\n<p><strong>Validation<\/strong><br \/>\nYou can&#8217;t specify patterns, minimum \/ maximum values, length constraints, there aren&#8217;t format specifications for strings such as email, IP addresses, or URIs, etc.<br \/>\n(<em>You could write your own logical types, similar to the built-in ones I link to above, if you wanted, but I think it&#8217;s debatable whether that&#8217;s the most effective way to enforce data validation if that is your goal.<\/em>)<\/p>\n<p><strong>Control over arbitrary additional fields<\/strong><br \/>\nThere isn&#8217;t an equivalent of JSON Schema&#8217;s <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">additionalProperties<\/code>, so you can&#8217;t specify whether additional fields not defined in the schema are allowed. Additional properties will not cause errors but are always ignored by JsonConverter.<\/p>\n<p><strong>Multiple types<\/strong><br \/>\nJSON Schema lets you union types (e.g. <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">\"type\": [\"string\", \"number\"]<\/code> to say that a value can be either a string or a number) &#8211; JSON Converter doesn&#8217;t let you describe that in a schema.<\/p>\n<p><strong>Logic<\/strong><br \/>\nYou can&#8217;t do things like <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">allOf<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">anyOf<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">oneOf<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">not<\/code>, or any of the fun conditional (<code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">if<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">then<\/code>, <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">else<\/code>, etc.) that JSON Schema has.<\/p>\n<p>Your option of these omissions probably depends on how often you use them in JSON Schema. I think the general point is that Kafka&#8217;s schema format is focused on enabling serialization \/ deserialization, rather than trying to enable data validation.<\/p>\n<p>If you&#8217;re going in the reverse direction, there are a few <strong>details you could have in a Kafka schema used with JSON Converter that would be lost if you create an equivalent JSON Schema<\/strong>.<\/p>\n<p>The main one is <strong>numeric precision<\/strong>. JSON Converter distinguishes between int8, int16, int32, int64, float and double. JSON Schema has <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">integer<\/code> and <code style=\"background-color: #FFFFC0; color: #770000; padding: 2px; font-weight: 600;\">number<\/code>. You can maybe mitigate this by using min\/max ranges and inferring an appropriate type from that, but you&#8217;ll likely lose some precision here.<\/p>\n<p>The way Kafka models a Map as something different from a regular struct (object) doesn&#8217;t have an obvious equivalent in JSON Schema, where everything is just an object.<\/p>\n<p>These differences again highlight the greater focus that Kafka schemas place on serialization \/ deserialization.<\/p>\n<h3>Versions of Kafka Connect that support this<\/h3>\n<p>Support for putting the schema in an external file was introduced in <a href=\"https:\/\/archive.apache.org\/dist\/kafka\/4.2.0\/RELEASE_NOTES.html\">Kafka Connect 4.2<\/a>, that we contributed in <a href=\"https:\/\/cwiki.apache.org\/confluence\/display\/KAFKA\/KIP-1054:+Support+external+schemas+in+JSONConverter\">KIP 1054<\/a>.<\/p>\n<p>Versions of Kafka Connect prior to 4.2 need the schema to be included within every event payload. For example:<\/p>\n<pre style=\"font-size: 1em; white-space: pre !important; overflow: scroll; background-color: #FFFFC0; color: #770000; padding: 4px; max-height: 200px;\">{\n    \"schema\": {\n        \"type\": \"struct\",\n        \"fields\": [\n            {\n                \"field\": \"id\",\n                \"type\": \"string\",\n                \"optional\": false\n            },\n            {\n                \"field\": \"name\",\n                \"type\": \"string\",\n                \"optional\": false\n            }\n        ]\n    },\n    \"payload\": {\n        \"id\": \"myid\",\n        \"name\": \"Bob\"\n    }\n}<\/pre>\n<p>That&#8217;s a little icky.<\/p>\n<p>There are hacky workarounds you can use to avoid having to do this, but it&#8217;s a huge improvement to have support for schema files now available out-of-the-box in JsonConverter.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this post, I&#8217;ll share examples of how to process JSON data in a Kafka Connect pipeline, and explain the schema format that Kafka uses to describe JSON events.\u00a0 Using sink connectors Kafka Connect sink connectors let you send the events on your Kafka topics\u00a0to external systems.\u00a0I&#8217;ve talked about this before, but to recap the [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":5858,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,584],"class_list":["post-5857","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5857","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5857"}],"version-history":[{"count":5,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5857\/revisions"}],"predecessor-version":[{"id":5864,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5857\/revisions\/5864"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5858"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5857"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5857"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5857"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}