{"id":4518,"date":"2021-10-25T21:41:31","date_gmt":"2021-10-25T21:41:31","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=4518"},"modified":"2024-05-13T22:45:01","modified_gmt":"2024-05-13T22:45:01","slug":"processing-apache-avro-serialized-kafka-messages-with-ibm-app-connect-enterprise","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=4518","title":{"rendered":"Processing Apache Avro-serialized Kafka messages with IBM App Connect Enterprise"},"content":{"rendered":"<p><strong><em>UPDATE: Please see the <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5228\">updated version of this post<\/a><\/em><\/strong><\/p>\n<p><strong><a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=app-connect-enterprise-software\">IBM App Connect Enterprise<\/a> (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka.<\/strong><\/p>\n<p><strong>In this post, I&#8217;ll describe how to use App Connect Enterprise to process Kafka messages that were serialized to a stream of bytes using Apache Avro schemas.<\/strong><\/p>\n<p><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/msgflow-screenshot.png\"\/><\/p>\n<h3>Background<\/h3>\n<p>Best practice when using <a href=\"https:\/\/kafka.apache.org\">Apache Kafka<\/a> is to define <a href=\"https:\/\/avro.apache.org\">Apache Avro<\/a> schemas with a definition of the structure of your Kafka messages.<\/p>\n<p>(For more detail about this, see my last post on <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=4506\">From bytes to objects: describing Kafka events<\/a>, or the <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=3781\">intro to Avro<\/a> that I wrote a couple of years ago.)<\/p>\n<p>In this post, I&#8217;m assuming that you have embraced Avro, and you have Kafka topics with messages that were serialized using Avro schemas.<\/p>\n<p>Perhaps you used a Java producer with an <a href=\"https:\/\/www.apicur.io\/registry\/docs\/apicurio-registry\/1.3.3.Final\/getting-started\/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry\">Avro SerDe<\/a> that handled the serialization automatically for you.<\/p>\n<p>Or your messages are coming from a Kafka Connect source connector, with an <a href=\"https:\/\/www.apicur.io\/registry\/docs\/apicurio-registry\/1.3.3.Final\/getting-started\/assembly-using-kafka-client-serdes.html#registry-serdes-config-producer-registry\">Avro converter<\/a> that is handling the serialization for you.<\/p>\n<p>Or you are doing the serialization yourself, such as if you&#8217;re <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=3892\">producing Avro-serialized messages from a Python app<\/a>.<\/p>\n<p>Now you want to use <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=app-connect-enterprise-software\">IBM App Connect Enterprise<\/a> to develop and host integrations for processing those Kafka messages. But you need App Connect to know how to:<\/p>\n<ul>\n<li>retrieve the Avro schemas it needs<\/li>\n<li>use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that are easy for ACE to manipulate and process<\/li>\n<\/ul>\n<p><!--more--><\/p>\n<h3>Using a Java compute node<\/h3>\n<p>The simplest way to do this is using a <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=nodes-javacompute-node\">JavaCompute node<\/a> that serves the same purpose as a SerDe client in a traditional Java Kafka application.<\/p>\n<p>I&#8217;ve written a sample to demonstrate how this could be done, which is <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\">available on github<\/a>.<\/p>\n<p>Instructions for how to use the sample are in the <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/README.md\">README.md<\/a> in that repository, but I&#8217;ll add some additional background and context here.<\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/annotated-msgflow-screenshot.png\"><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/annotated-msgflow-screenshot.png\"\/><\/a><\/p>\n<h4>KafkaConsumer node<\/h4>\n<p>Configure this node to start consuming messages from your Kafka topics. It will emit <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=parsers-blob-parser-domain\">BLOB binary messages<\/a> each containing the raw stream of bytes from a Kafka message.<\/p>\n<p>For example, it could be emitting binary data such as this (which was created by a <a href=\"https:\/\/debezium.io\/documentation\/reference\/connectors\/postgresql.html\">PostgreSQL Kafka Connect connector from Debezium<\/a> using an Avro converter).<\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/es-screenshot-binary.png\"><img decoding=\"async\" style=\"border: thin black solid\" alt=\"screenshot\" src=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/raw\/master\/docs\/es-screenshot-binary.png\"\/><\/a><br \/>\n<small>Screenshot from <a href=\"https:\/\/ibm.github.io\/event-streams\/\">IBM Event Streams<\/a> but the principle holds for any Avro-serialized messages on any Kafka cluster<\/small><\/p>\n<h4>Avro deserialize JavaCompute node<\/h4>\n<p>This node will:<\/p>\n<ul>\n<li><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L298-L324\">identify the ID for the schema<\/a> that was used to serialize that specific message<\/li>\n<li>use the ID to <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L340-L373\">download the Avro schema<\/a> from a schema registry<\/li>\n<li>use the downloaded Avro schema to <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L226-L233\">deserialize the binary message<\/a> data<\/li>\n<li><a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L235-L246\">emit the deserialized message data<\/a> as a structured <a href=\"https:\/\/www.ibm.com\/docs\/en\/app-connect\/12.0?topic=parsers-json-parser-domain\">JSON object<\/a><\/li>\n<\/ul>\n<p>Similar to how most SerDe clients work, performance is improved by caching and reusing Avro schemas where appropriate.<\/p>\n<p>If the node cannot deserialize the message data, it <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L252-L258\">emits the raw Kafka message as-is to the <em>alt<\/em> output terminal<\/a>, so they can be reviewed or otherwise processed. Possible reasons why this might happen include:<\/p>\n<ul>\n<li>The schema that was originally used to serialize the message data has since been deleted from the Schema Registry and is no longer available<\/li>\n<li>The schema registry is not currently available<\/li>\n<li>Schema registry credentials provided to the node were rejected<\/li>\n<\/ul>\n<h3>More details<\/h3>\n<p>I&#8217;ve put a lot more comments in the <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/AvroDeserialize.java\">sample code<\/a> that explains what the deserialize node needs to do, and how you can adapt it for your own requirements.<\/p>\n<p>I&#8217;ve put some of the most obvious aspects (like whether to <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L439-L446\">use a binary or json decoder<\/a>, based on how the original messages were encoded) in a <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/master\/sample-policy.policyxml\">configuration policy<\/a>.<\/p>\n<p>Other customizations, (such as supporting <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L109-L124\">different schema ID handlers<\/a>, or working with <a href=\"https:\/\/github.com\/dalelane\/ibm-ace-avrodeserialize\/blob\/094f414ea9388e9e04c61ea81d108751f78a771b\/AvroDeserialize.java#L340-L398\">alternate schema registry implementations<\/a>) will need some minor code changes &#8211; but I&#8217;ve put comments and placeholders in the sample to show where to start!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>UPDATE: Please see the updated version of this post IBM App Connect Enterprise (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka. In this post, I&#8217;ll describe how to use App Connect Enterprise to process Kafka messages that were serialized to a [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":4526,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[595,593,594,584],"class_list":["post-4518","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apacheavro","tag-apachekafka","tag-avro","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/4518","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=4518"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/4518\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/4526"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=4518"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=4518"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=4518"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}