{"id":5228,"date":"2024-05-13T22:29:01","date_gmt":"2024-05-13T22:29:01","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5228"},"modified":"2024-05-13T22:29:02","modified_gmt":"2024-05-13T22:29:02","slug":"processing-apache-avro-serialized-messages-from-kafka-using-ibm-app-connect-enterprise","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5228","title":{"rendered":"Processing Apache Avro-serialized messages from Kafka using IBM App Connect Enterprise"},"content":{"rendered":"<p><strong><a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=app-connect-enterprise-software\">IBM App Connect Enterprise<\/a> (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka.<\/strong><\/p>\n<p><strong>In this post, I&#8217;ll describe how to use App Connect Enterprise to process Kafka messages that were serialized using Apache Avro schemas.<\/strong><\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/msgflow-screenshot.png?raw=true&#038;updated\"\/><\/p>\n<p><em>This is an update to an <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=4518\">earlier version of this post<\/a>, reflecting updates to the sample code.<\/em><\/p>\n<h3>Background<\/h3>\n<p>Best practice when using <a href=\"https:\/\/kafka.apache.org\">Apache Kafka<\/a> is to define <a href=\"https:\/\/avro.apache.org\">Apache Avro<\/a> schemas with a definition of the structure of your Kafka messages, and to store those schemas in a central registry that client applications can access at runtime.<\/p>\n<p>If you want to use <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=app-connect-enterprise-software\">IBM App Connect Enterprise<\/a> to develop and host integrations for processing those Kafka messages, you need App Connect to know how to:<\/p>\n<ul>\n<li>retrieve the Avro schemas it needs using schema registry REST APIs<\/li>\n<li>use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that ACE can manipulate and process<\/li>\n<\/ul>\n<p><!--more--><\/p>\n<h3>Using a Java compute node<\/h3>\n<p>The simplest way to do this is using a <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=nodes-javacompute-node\">Java Compute node<\/a> that serves the same purpose as a SerDe client in a traditional Java Kafka application.<\/p>\n<p>In this post, I&#8217;ll share an example implementation that demonstrates how this can be done.<\/p>\n<p>It is <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\">available on github<\/a>. Instructions for how to use the sample are in the <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/README.md\">README.md<\/a> in that repository. I&#8217;ll use this post to give a high-level overview and share examples of how to configure it to match the configuration for your Kafka applications.<\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/annotated-msgflow-screenshot.png\"><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/annotated-msgflow-screenshot.png?raw=true\"\/><\/a><\/p>\n<h4>KafkaConsumer node<\/h4>\n<p>Configure this node to start consuming messages from your Kafka topics. It will emit <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=parsers-blob-parser-domain\">BLOB binary messages<\/a> each containing the raw stream of bytes from a Kafka message.<\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-int-global-topic.png?raw=true\"><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-int-global-topic.png?raw=true\"\/><\/a><br \/>\n<small>Screenshot from <a href=\"https:\/\/ibm.github.io\/event-streams\/\">IBM Event Streams<\/a> but the examples in this post could be used with Avro-serialized messages on any Kafka cluster<\/small><\/p>\n<h4>Avro Deserialize Java Compute node<\/h4>\n<p>This node will:<\/p>\n<ul>\n<li>identify the ID for the schema that was used to serialize that specific message (either <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L386-L390\">from the message headers<\/a> or the <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L440-L444\">start of the message payload<\/a>)<\/li>\n<li>use the ID to <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L473-L499\">download the Avro schema<\/a> from a schema registry<\/li>\n<li>use the downloaded Avro schema to <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L326-L332\">deserialize the binary message<\/a> data<\/li>\n<li><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L334-L341\">emit the deserialized message data<\/a> as a structured <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=parsers-json-parser-domain\">JSON object<\/a><\/li>\n<\/ul>\n<p>Similar to how most SerDe clients work, performance is improved by <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L494-L498\">caching and reusing Avro schemas where possible<\/a>.<\/p>\n<p>If the Compute node cannot deserialize the message data, it <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/AvroDeserialize.java#L351-L357\">emits the raw Kafka message as-is to the <em>alt<\/em> output terminal<\/a>, so they can be sent to a dead-letter-queue for review or alternative processing. Possible reasons why this might happen include:<\/p>\n<ul>\n<li>The schema that was originally used to serialize the message data has since been deleted from the Schema Registry and is no longer available<\/li>\n<li>The schema registry is not currently available<\/li>\n<li>Schema registry credentials provided to the node were rejected<\/li>\n<\/ul>\n<h3>Examples of using the Java Compute node<\/h3>\n<h4>Setting up<\/h4>\n<p>To illustrate the way this can be used, I started a variety of Kafka Connect Connectors. Each was configured slightly differently.<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/connectors.png?raw=true\"\/><br \/><small>Screenshot from my OpenShift Console, showing the Connectors I used for this post. The definitions are <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/demos\/kafka-setup\/connectors.yaml\">available in Github<\/a>, but I&#8217;ll copy the relevant config below.<\/small><\/p>\n<p>These all produce the same type of randomly generated events, each to different topics. This gave me a variety of topics, all using slightly different approaches of serializing the message data and different mechanisms for recording the schema that was used.<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/topics.png?raw=true\"\/><br \/><small>Screenshot from the IBM Event Streams UI showing the topics I used for this post. The definitions are <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/demos\/kafka-setup\/topics.yaml\">available in Github<\/a>, but there is nothing specific about the config that is relevant here other than the topic name.<\/small><\/p>\n<p>All of the messages used the same type of schema, but a separate schema was registered for each topic, allowing me to illustrate how the Java Compute node can lookup and retrieve different schemas when the identity is encoded in different ways.<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/all-schemas.png?raw=true\"\/><\/p>\n<h4>The demo data<\/h4>\n<p>All of the messages in the following examples have data in this sort of shape:<\/p>\n<pre class=\"acelisting\">{\n    \"id\": \"f08c2354-4a1d-479b-a1c8-a1166e948393\",\n    \"customer\": {\n        \"id\": \"18237e95-8e55-4893-9382-7821056e4e49\",\n        \"name\": \"Britteny Ledner\",\n        \"emails\": [\n            \"britteny.ledner@example.com\"\n        ]\n    },\n    \"products\": [\n        \"L Classic Low-rise Jeans\",\n        \"M Black Bootcut Jeans\"\n    ],\n    \"address\": {\n        \"shippingaddress\": {\n            \"number\": 69,\n            \"street\": \"Desmond Course\",\n            \"city\": \"Jasttown\",\n            \"zipcode\": \"92337\",\n            \"country\": {\n                \"code\": \"US\",\n                \"name\": \"United States\"\n            },\n            \"phones\": null\n        },\n        \"billingaddress\": {\n            \"number\": 571,\n            \"street\": \"Schumm Loop\",\n            \"city\": \"Port Hansshire\",\n            \"zipcode\": \"99742-5419\",\n            \"country\": {\n                \"code\": \"US\",\n                \"name\": \"United States\"\n            },\n            \"phones\": null\n        }\n    },\n    \"ordertime\": \"2024-05-12T14:28:36.344Z\"\n}<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/demos\/example-message\/online-order.json\">demos\/example-message\/online-order.json<\/a><\/small><\/p>\n<p>Obviously, this is JSON and not Avro. The actual messages look more like this, but that&#8217;s harder to visualise!<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/example-message\/online-order-avro.png?raw=true\"\/><\/p>\n<h4>The demo processing<\/h4>\n<p>To illustrate the way that App Connect Enterprise can process this binary data once it is able to deserialize it, I created this flow:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/demo-message-flow.png?raw=true\"\/><\/p>\n<p>The <strong>avro deserialize<\/strong> node is a Java Compute node using the <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/AvroDeserialize.java\"><code>AvroDeserialize.java<\/code><\/a> source in Github.<\/p>\n<p>The <strong>redact customer name<\/strong> node is a Compute node using this ESQL:<\/p>\n<pre class=\"acelisting\">SET OutputRoot.JSON.Data.customer.name = 'redacted';<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/redaction.esql#L7\">demos\/ace-setup\/redaction.esql#L7<\/a><\/small><\/p>\n<p>Showing that we can replace the customer name is enough to illustrate that the flow has been able to successfully parse the binary data and apply the Avro schema correctly.<\/p>\n<h4>The demo output<\/h4>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/demo-output.png\"\/><\/p>\n<p>My flow outputs the parsed and modified messages in the JSON output format to <code>\/tmp\/ace\/output<\/code>. I&#8217;ll include screenshots of this file to show the results of processing.<\/p>\n<h3>Configurations<\/h3>\n<p>For each of these examples, I&#8217;ll start with a different way to configure a Kafka producer, and then show how to configure the Avro Deserialize node to process the resulting messages.<\/p>\n<hr \/>\n<h4>Storing the content ID of the schema as a Long at the start of a binary-encoded message payload<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.use-id: contentId\nvalue.converter.apicurio.registry.headers.enabled: false\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L89-L96\">demos\/kafka-setup\/connectors.yaml#L89-L96<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-long-content-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>contentId<\/strong>&lt;\/schema.registry.use.id&gt;\n&lt;schema.registry.id.length&gt;<strong>8<\/strong>&lt;\/schema.registry.id.length&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-payload-long-content.policy.xml#L5-L7\">demos\/ace-setup\/schemaregistry-policies\/online-payload-long-content.policy.xml#L5-L7<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-long-content-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the content ID of the schema as an Integer at the start of a binary-encoded message payload<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.as-confluent: true\nvalue.converter.apicurio.registry.use-id: contentId\nvalue.converter.apicurio.registry.headers.enabled: false\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L40-L48\">demos\/kafka-setup\/connectors.yaml#L40-L48<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-int-content-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>contentId<\/strong>&lt;\/schema.registry.use.id&gt;\n&lt;schema.registry.id.length&gt;<strong>4<\/strong>&lt;\/schema.registry.id.length&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/demos\/ace-setup\/schemaregistry-policies\/online-payload-int-content.policy.xml#L5-L7\">demos\/ace-setup\/schemaregistry-policies\/online-payload-int-content.policy.xml#L5-L7<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-int-content-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the global ID of the schema as a Long at the start of a binary-encoded message payload<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.use-id: globalId\nvalue.converter.apicurio.registry.headers.enabled: false\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L186-L193\">demos\/kafka-setup\/connectors.yaml#L186-L193<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-long-global-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>globalId<\/strong>&lt;\/schema.registry.use.id&gt;\n&lt;schema.registry.id.length&gt;<strong>8<\/strong>&lt;\/schema.registry.id.length&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-payload-long-global.policy.xml#L5-L7\">demos\/ace-setup\/schemaregistry-policies\/online-payload-long-global.policy.xml#L5-L7<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-long-global-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the global ID of the schema as an Integer at the start of a binary-encoded message payload<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.as-confluent: true\nvalue.converter.apicurio.registry.use-id: globalId\nvalue.converter.apicurio.registry.headers.enabled: false\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L137-L145\">demos\/kafka-setup\/connectors.yaml#L137-L145<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-int-global-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>globalId<\/strong>&lt;\/schema.registry.use.id&gt;\n&lt;schema.registry.id.length&gt;<strong>4<\/strong>&lt;\/schema.registry.id.length&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-payload-int-global.policy.xml#L5-L7\">demos\/ace-setup\/schemaregistry-policies\/online-payload-int-global.policy.xml#L5-L7<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-payload-int-global-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the content ID of the schema in the header of a binary-encoded message<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.use-id: contentId\nvalue.converter.apicurio.registry.headers.enabled: true\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L234-L241\">demos\/kafka-setup\/connectors.yaml#L234-L241<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-content-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>contentId<\/strong>&lt;\/schema.registry.use.id&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-header-content.policy.xml#L5-L6\">demos\/ace-setup\/schemaregistry-policies\/online-header-content.policy.xml#L5-L6<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-content-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the global ID of the schema in the header of a binary-encoded message<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.use-id: globalId\nvalue.converter.apicurio.registry.headers.enabled: true\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L282-L289\">demos\/kafka-setup\/connectors.yaml#L282-L289<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-global-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>globalId<\/strong>&lt;\/schema.registry.use.id&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-header-global.policy.xml#L5-L6\">demos\/ace-setup\/schemaregistry-policies\/online-header-global.policy.xml#L5-L6<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-global-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the content ID of the schema in the header of a JSON-encoded message<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.use-id: contentId\nvalue.converter.apicurio.registry.headers.enabled: true\nvalue.converter.apicurio.registry.avro.encoding: JSON<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L331-L338\">demos\/kafka-setup\/connectors.yaml#L331-L338<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-json-content-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>contentId<\/strong>&lt;\/schema.registry.use.id&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-header-json-content.policy.xml#L5-L6\">demos\/ace-setup\/schemaregistry-policies\/online-header-json-content.policy.xml#L5-L6<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-json-content-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the global ID of the schema in the header of a JSON-encoded message<\/h4>\n<p>The Converter was configured to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.use-id: globalId\nvalue.converter.apicurio.registry.headers.enabled: true\nvalue.converter.apicurio.registry.avro.encoding: JSON<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L380-L387\">demos\/kafka-setup\/connectors.yaml#L380-L387<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-json-global-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>apicurio<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.use.id&gt;<strong>globalId<\/strong>&lt;\/schema.registry.use.id&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-header-json-global.policy.xml#L5-L6\">demos\/ace-setup\/schemaregistry-policies\/online-header-json-global.policy.xml#L5-L6<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-header-json-global-output.png?raw=true\"\/><\/div>\n<hr \/>\n<h4>Storing the schema ID in a schema registry with a Confluent-compatible REST API<\/h4>\n<p>Confluent SerDe libraries store the schema ID as an Integer at the start of a message payload. I recreated this behaviour by configuring a Converter to produce messages like this:<\/p>\n<pre class=\"acelisting\">value.converter.apicurio.registry.as-confluent: true\nvalue.converter.apicurio.registry.use-id: contentId\nvalue.converter.apicurio.registry.headers.enabled: false\nvalue.converter.apicurio.registry.avro.encoding: BINARY<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/kafka-setup\/connectors.yaml#L40-L48\">demos\/kafka-setup\/connectors.yaml#L40-L48<\/a><\/small><\/p>\n<p>This results in Kafka messages like this:<\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-confluent-topic.png?raw=true\"\/><\/p>\n<p>I configured the Avro Deserialize node using:<\/p>\n<pre class=\"acelisting\">&lt;schema.registry.api&gt;<strong>confluent<\/strong>&lt;\/schema.registry.api&gt;\n&lt;schema.registry.id.length&gt;<strong>4<\/strong>&lt;\/schema.registry.id.length&gt;<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/80017ea4b28b3ac3ec99769180b136ba3213cf4b\/demos\/ace-setup\/schemaregistry-policies\/online-confluent.policy.xml#L4-L6\">demos\/ace-setup\/schemaregistry-policies\/online-confluent.policy.xml#L4-L6<\/a><\/small><\/p>\n<p>This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:<\/p>\n<div style=\"max-height: 280px; overflow-y: scroll; background-color: black;\"><img decoding=\"async\" style=\"max-width: 450px;\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/demos\/screenshots\/online-confluent-output.png?raw=true\"\/><\/div>\n<hr \/>\n<p>If you&#8217;d like to recreate this demo, you can find <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/tree\/master\/demos\">everything I used to set this up in Github<\/a>, including how I <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/tree\/master\/demos\/kafka-setup\">set up Kafka<\/a> and <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/tree\/master\/demos\/ace-setup\">App Connect<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>IBM App Connect Enterprise (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka. In this post, I&#8217;ll describe how to use App Connect Enterprise to process Kafka messages that were serialized using Apache Avro schemas. This is an update to an earlier [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":5235,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[595,593,604,594,583,584],"class_list":["post-5228","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apacheavro","tag-apachekafka","tag-appconnect","tag-avro","tag-ibmeventstreams","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5228","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5228"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5228\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5235"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5228"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5228"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5228"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}