{"id":5035,"date":"2023-12-11T21:55:04","date_gmt":"2023-12-11T21:55:04","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5035"},"modified":"2023-12-11T21:55:06","modified_gmt":"2023-12-11T21:55:06","slug":"processing-xml-with-kafka-connect","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5035","title":{"rendered":"Processing XML with Kafka Connect"},"content":{"rendered":"<p><strong>In this tutorial, I&#8217;ll share examples of how to process XML data at various points in a Kafka Connect pipeline, using a <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\">new plugin from IBM Event Streams<\/a>.<\/strong><\/p>\n<p>You can assemble a Kafka Connect pipeline in a huge number of ways, so I&#8217;m not going to attempt an exhaustive list here. Instead, I&#8217;ve come up with eight examples that are illustrative of the sort of use cases you can satisfy.<\/p>\n<p>I&#8217;ll summarise and link to my different examples here, so you can jump straight to the one that sounds the closest to your use case:<\/p>\n<ul>\n<li><a style=\"font-size: 1.1em; font-weight: bold; \" href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#i-need-connectors-to-put-data-on-my-kafka-topics-in-xml-format\">&quot;I need Connectors to put data on my Kafka topics in XML format&quot;<\/a>\n<ul>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#example-produce-events-from-your-source-connectors-in-xml-format\">Produce events from your source connectors in XML format<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#produce-events-from-your-source-connectors-in-xml-and-generate-an-xsd-schema-to-go-with-them\">Produce events from your source connectors in XML, and generate an XSD schema to go with them<\/a><\/li>\n<\/ul>\n<\/li>\n<li style=\"padding-top: 1em;\"><a style=\"font-size: 1.1em; font-weight: bold; \" href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#i-have-xml-data-in-external-systems-that-i-need-to-bring-into-kafka-in-a-kafka-friendly-format\">&quot;I have XML data in external systems that I need to bring into Kafka in a Kafka-friendly format&quot;<\/a>\n<ul>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#transfer-xml-messages-from-mq-queues-into-kafka-topics-as-json-messages\">Transfer XML messages from MQ queues into Kafka topics as JSON messages<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#transfer-xml-messages-from-mq-queues-into-kafka-topics-as-avro-messages\">Transfer XML messages from MQ queues into Kafka topics as Avro messages<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#capture-xml-events-from-a-web-service-and-bring-them-into-kafka-topics-as-json-messages\">Capture XML events from a web service and bring them into Kafka topics as JSON messages<\/a><\/li>\n<\/ul>\n<\/li>\n<li style=\"padding-top: 1em;\"><a style=\"font-size: 1.1em; font-weight: bold; \" href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#i-have-kafka-messages-in-a-kafka-friendly-format-that-i-need-to-send-to-external-systems-as-xml\">&quot;I have Kafka messages in a Kafka-friendly format that I need to send to external systems as XML&quot;<\/a>\n<ul>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#transfer-json-messages-from-your-kafka-topics-to-mq-queues-as-xml\">Transfer JSON messages from your Kafka topics to MQ queues as XML documents<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5035#submit-json-messages-from-your-kafka-topics-to-an-xml-web-service-using-http-posts\">Submit JSON messages from your Kafka topics to an XML web service using HTTP posts<\/a><\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p><!--more--><\/p>\n<h2 id=\"background\">Background<\/h2>\n<p>I&#8217;ll start with a recap of the components that make up a Kafka Connect pipeline. This will make it easier for me to refer to different pieces as I go through the examples. If you already know how Connect works, you can skip this bit.<\/p>\n<h3 id=\"source-pipeline\">Source pipeline<\/h3>\n<p>This is what a Connect source pipeline does:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/source-pipeline.png?raw=true\" alt=\"diagram of a Kafka Connect source pipeline\"\/><\/p>\n<ol>\n<li>The <strong><a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/source\/SourceConnector.html\"><code class=\"codeterm\">Connector<\/code><\/a><\/strong> connects to some external system, retrieves some data, and passes it on. <br \/><em>Ideally, this is in a structured form: such as Connect&#8217;s native structured form (<a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/data\/Struct.html\"><code class=\"codeterm\">Struct<\/code><\/a>), or a Java structure (such as a <a href=\"https:\/\/docs.oracle.com\/en\/java\/javase\/11\/docs\/api\/java.base\/java\/util\/Map.html\">Map<\/a>).<\/em><\/li>\n<li>Optionally, a series of <strong><a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/transforms\/Transformation.html\"><code class=\"codeterm\">Transformation<\/code><\/a><\/strong> plugins perform operations, such as filters and transforms, on the Connect objects.<br \/><em>Transformations that operate on Connect structs are de-coupled from Connectors because a Connect Struct is a generic structure, agnostic of any individual Connector.<\/em><\/li>\n<li>A <strong><a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/storage\/Converter.html\"><code class=\"codeterm\">Converter<\/code><\/a><\/strong> converts the Connect objects to a serialized format for producing to Kafka topics.<br \/><em>For example, this could be Avro, JSON, Protobuf, etc.<\/em><\/li>\n<\/ol>\n<h3 id=\"sink-pipeline\">Sink pipeline<\/h3>\n<p>This is what a Connect sink pipeline does:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/sink-pipeline.png?raw=true\" alt=\"diagram of a Kafka Connect sink pipeline\"\/><\/p>\n<ol>\n<li>The bytes of a Kafka message are given to a <strong><a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/storage\/Converter.html\"><code class=\"codeterm\">Converter<\/code><\/a><\/strong> to be deserialized into Connect Structs.<\/li>\n<li>Optionally, a series of <strong><a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/transforms\/Transformation.html\"><code class=\"codeterm\">Transformation<\/code><\/a><\/strong> plugins perform operations, such as filters and transforms, on the Connect objects.<br \/><em>Transformations that operate on Connect structs are de-coupled from Converters because a Connect Struct is a generic structure, agnostic of any individual Converter.<\/em><\/li>\n<li>A <strong><a href=\"https:\/\/kafka.apache.org\/36\/javadoc\/org\/apache\/kafka\/connect\/sink\/SinkConnector.html\"><code class=\"codeterm\">Connector<\/code><\/a><\/strong> submits the Connect structs to some external system.<\/li>\n<\/ol>\n<h2 id=\"examples\">Examples<\/h2>\n<p>The following examples illustrate the different places to process XML data in a Kafka Connect pipeline. I&#8217;ll link to more detailed configuration, and include the XML-specific bits in the post here.<\/p>\n<h3 class=\"usecasesection\" id=\"i-need-connectors-to-put-data-on-my-kafka-topics-in-xml-format\">&quot;I need Connectors to put data on my Kafka topics in XML format&quot;<\/h3>\n<h4 class=\"examplesection\" id=\"example-produce-events-from-your-source-connectors-in-xml-format\">Example: Produce events from your source connectors in XML format<\/h4>\n<p>I&#8217;ll start with <a href=\"https:\/\/github.com\/IBM\/kafka-connect-loosehangerjeans-source\"><code class=\"codeterm\">DatagenSourceConnector<\/code><\/a> &#8211; a datagen connector I wrote for the <a href=\"https:\/\/ibm.github.io\/event-automation\/tutorials\/\">Event Processing tutorials<\/a>.<\/p>\n<p>I&#8217;ve been using it with a <code class=\"codeterm\">JSONConverter<\/code> so far:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/converter-json.png?raw=true\" alt=\"diagram of the datagen connector with a JSON converter\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">DatagenSourceConnector<\/code> <br \/>Generates events about a fictional clothes retailer<\/li>\n<li><code class=\"codeterm\">JsonConverter<\/code> <br \/>Serializes the clothing retail events into JSON strings<\/li>\n<\/ol>\n<p>My full config for how I&#8217;ve been running it is <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-json.yaml\">here<\/a> but the interesting bit looks like:<\/p>\n<pre class=\"listing\"><code>value.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-json.yaml\">datagen-json.yaml<\/a><\/p>\n<p>Using a <code class=\"codeterm\">JsonConverter<\/code> like this produces events such as:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;id&quot;: &quot;68579620-fccc-4acb-bfcc-3686919346aa&quot;,\n    &quot;customer&quot;: &quot;Taren Reichert&quot;,\n    &quot;customerid&quot;: &quot;ef8db9c1-c685-4600-be30-bbd1fa2f7efd&quot;,\n    &quot;description&quot;: &quot;XS Retro Boyfriend Jeans&quot;,\n    &quot;price&quot;: 16.41,\n    &quot;quantity&quot;: 6,\n    &quot;region&quot;: &quot;APAC&quot;,\n    &quot;ordertime&quot;: &quot;2023-10-29 13:20:17.389&quot;\n}\n<\/code><\/pre>\n<p>Replacing <code class=\"codeterm\">JSONConverter<\/code> with the <code class=\"codeterm\">XMLConverter<\/code> let me produce these events in XML instead.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/converter-datagen.png?raw=true\" alt=\"diagram of the datagen connector with an XML converter\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">DatagenSourceConnector<\/code> <br \/>Generates events about a fictional clothes retailer<\/li>\n<li><code class=\"codeterm\">XmlConverter<\/code> <br \/>Serializes the clothing retailer events into XML strings<\/li>\n<\/ol>\n<p>The full <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-xml.yaml\">config for doing this is here<\/a> but the relevant bit is:<\/p>\n<pre class=\"listing\"><code>value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-xml.yaml\">datagen-xml.yaml<\/a><\/p>\n<p>Using an <code class=\"codeterm\">XmlConverter<\/code>, with the same connector, produces events like:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot; standalone=&quot;no&quot;?&gt;\n&lt;order&gt;\n    &lt;id&gt;68579620-fccc-4acb-bfcc-3686919346aa&lt;\/id&gt;\n    &lt;customer&gt;Taren Reichert&lt;\/customer&gt;\n    &lt;customerid&gt;ef8db9c1-c685-4600-be30-bbd1fa2f7efd&lt;\/customerid&gt;\n    &lt;description&gt;XS Retro Boyfriend Jeans&lt;\/description&gt;\n    &lt;price&gt;16.41&lt;\/price&gt;\n    &lt;quantity&gt;6&lt;\/quantity&gt;\n    &lt;region&gt;APAC&lt;\/region&gt;\n    &lt;ordertime&gt;2023-10-29 13:20:17.389&lt;\/ordertime&gt;\n&lt;\/order&gt;\n<\/code><\/pre>\n<p>Here is another example of using <code class=\"codeterm\">XMLConverter<\/code> in a source pipeline.<\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/kafka-connect-stockprice-source\"><code class=\"codeterm\">StockPriceSourceConnector<\/code><\/a> emits stock price updates. <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=4463\">When I&#8217;ve used it<\/a> with a <code class=\"codeterm\">JsonConverter<\/code> in the past, it produced events like this:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;open&quot;      : 1.23,\n    &quot;high&quot;      : 1.23,\n    &quot;low&quot;       : 1.22,\n    &quot;close&quot;     : 1.23,\n    &quot;timestamp&quot; : 1655757900,\n    &quot;datetime&quot;  : &quot;2022-06-20 20:45:00&quot;\n}\n<\/code><\/pre>\n<p>Replacing <code class=\"codeterm\">JSONConverter<\/code> with <code class=\"codeterm\">XMLConverter<\/code> lets me produce these stock price events in XML instead.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/converter-stockprices.png?raw=true\" alt=\"diagram of the stock prices connector with an XML converter\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">StockPriceSourceConnector<\/code> <br \/>Emits stock price update events<\/li>\n<li><code class=\"codeterm\">XmlConverter<\/code> <br \/>Serializes the stock price events into XML strings<\/li>\n<\/ol>\n<p>The interesting bit of <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/stockprices-xml.yaml\">the config for this<\/a> is:<\/p>\n<pre class=\"listing\"><code>value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/stockprices-xml.yaml\">stockprices-xml.yaml<\/a><\/p>\n<p>That produces events like:<\/p>\n<pre class=\"listing output\"><code class=\"language-xml\">&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot; standalone=&quot;no&quot;?&gt;\n&lt;stockdata&gt;\n    &lt;open&gt;160.65&lt;\/open&gt;\n    &lt;high&gt;160.75&lt;\/high&gt;\n    &lt;low&gt;160.65&lt;\/low&gt;\n    &lt;close&gt;160.75&lt;\/close&gt;\n    &lt;volume&gt;10&lt;\/volume&gt;\n    &lt;timestamp&gt;1701478140&lt;\/timestamp&gt;\n    &lt;datetime&gt;2023-12-01 19:49:00&lt;\/datetime&gt;\n&lt;\/stockdata&gt;\n<\/code><\/pre>\n<h4 class=\"examplesection\" id=\"produce-events-from-your-source-connectors-in-xml-and-generate-an-xsd-schema-to-go-with-them\">Produce events from your source connectors in XML, and generate an XSD schema to go with them<\/h4>\n<p>When Connect records include information about the structure of the events, a Converter can also be configured to output these as schemas. The type of schema will vary depending on the type of Converter.<\/p>\n<p>Returning to <a href=\"https:\/\/github.com\/IBM\/kafka-connect-loosehangerjeans-source\"><code class=\"codeterm\">DatagenSourceConnector<\/code><\/a>, and again using <code class=\"codeterm\">JSONConverter<\/code> as a comparison:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/converter-json.png?raw=true\" alt=\"diagram of the datagen connector with a JSON converter\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">DatagenSourceConnector<\/code> <br \/>Generates events about a fictional clothes retailer<\/li>\n<li><code class=\"codeterm\">JsonConverter<\/code> <br \/>Serializes the clothing retailer events into JSON strings, with an embedded <a href=\"https:\/\/json-schema.org\">JSON Schema<\/a><\/li>\n<\/ol>\n<p>The relevant part of <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-json.yaml\">the config for this<\/a> looks like:<\/p>\n<pre class=\"listing\"><code>value.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=true\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-json.yaml\">datagen-json.yaml<\/a><\/p>\n<p>Setting <code class=\"codeterm\">schemas.enable<\/code> to true with a <code class=\"codeterm\">JsonConverter<\/code> produces events like:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;schema&quot;: {\n        &quot;type&quot;: &quot;struct&quot;,\n        &quot;fields&quot;: [\n            { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;id&quot; },\n            { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;customer&quot; },\n            { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;customerid&quot; },\n            { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;description&quot; },\n            { &quot;type&quot;: &quot;double&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;price&quot; },\n            { &quot;type&quot;: &quot;int32&quot;,  &quot;optional&quot;: false, &quot;field&quot;: &quot;quantity&quot; },\n            { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;region&quot; },\n            { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot;: false, &quot;field&quot;: &quot;ordertime&quot; }\n        ],\n        &quot;optional&quot;: false,\n        &quot;name&quot;: &quot;order&quot;\n    },\n    &quot;payload&quot;: {\n        &quot;id&quot;: &quot;55031d01-1bea-4f3b-9b29-12a968d8230e&quot;,\n        &quot;customer&quot;: &quot;Gabriel Stracke&quot;,\n        &quot;customerid&quot;: &quot;7a75009c-5781-4074-9bf0-20dee537dc9d&quot;,\n        &quot;description&quot;: &quot;XXL Stonewashed Capri Jeans&quot;,\n        &quot;price&quot;: 28.71,\n        &quot;quantity&quot;: 3,\n        &quot;region&quot;: &quot;SA&quot;,\n        &quot;ordertime&quot;: &quot;2023-10-29 13:13:20.675&quot;\n    }\n}\n<\/code><\/pre>\n<p>Replacing <code class=\"codeterm\">JSONConverter<\/code> with the <code class=\"codeterm\">XMLConverter<\/code> let me produce these events in XML instead.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/converter-datagen.png?raw=true\" alt=\"diagram of the datagen connector with an XML converter\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">DatagenSourceConnector<\/code> <br \/>Generates events about a fictional clothes retailer<\/li>\n<li><code class=\"codeterm\">XmlConverter<\/code> <br \/>Serializes the clothing retailer events into XML strings with an embedded XSD schema.<\/li>\n<\/ol>\n<p>The XML-specific bit of <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-xml.yaml\">the config for this<\/a> looks like:<\/p>\n<pre class=\"listing\"><code>value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter\nvalue.converter.schemas.enable=true\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/datagen-xml.yaml\">datagen-xml.yaml<\/a><\/p>\n<p>Setting <code class=\"codeterm\">schemas.enable<\/code> to true with a <code class=\"codeterm\">XmlConverter<\/code> produces events like:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;\n&lt;order xmlns:xsi=&quot;http:\/\/www.w3.org\/2001\/XMLSchema-instance&quot; xsi:noNamespaceSchemaLocation=&quot;#connectSchema&quot;&gt;\n    &lt;xs:schema xmlns:xs=&quot;http:\/\/www.w3.org\/2001\/XMLSchema&quot; id=&quot;connectSchema&quot;&gt;\n        &lt;xs:element name=&quot;order&quot;&gt;\n            &lt;xs:complexType&gt;\n                &lt;xs:sequence&gt;\n                  &lt;xs:any maxOccurs=&quot;1&quot; minOccurs=&quot;0&quot; namespace=&quot;http:\/\/www.w3.org\/2001\/XMLSchema&quot; processContents=&quot;skip&quot;\/&gt;\n                  &lt;xs:element name=&quot;id&quot; type=&quot;xs:string&quot; \/&gt;\n                  &lt;xs:element name=&quot;customer&quot; type=&quot;xs:string&quot; \/&gt;\n                  &lt;xs:element name=&quot;customerid&quot; type=&quot;xs:string&quot; \/&gt;\n                  &lt;xs:element name=&quot;description&quot; type=&quot;xs:string&quot; \/&gt;\n                  &lt;xs:element name=&quot;price&quot; type=&quot;xs:double&quot; \/&gt;\n                  &lt;xs:element name=&quot;quantity&quot; type=&quot;xs:integer&quot; \/&gt;\n                  &lt;xs:element name=&quot;region&quot; type=&quot;xs:string&quot; \/&gt;\n                  &lt;xs:element name=&quot;ordertime&quot; type=&quot;xs:string&quot; \/&gt;\n                &lt;\/xs:sequence&gt;\n            &lt;\/xs:complexType&gt;\n        &lt;\/xs:element&gt;\n    &lt;\/xs:schema&gt;\n\n    &lt;id&gt;55031d01-1bea-4f3b-9b29-12a968d8230e&lt;\/id&gt;\n    &lt;customer&gt;Gabriel Stracke&lt;\/customer&gt;\n    &lt;customerid&gt;7a75009c-5781-4074-9bf0-20dee537dc9d&lt;\/customerid&gt;\n    &lt;description&gt;XXL Stonewashed Capri Jeans&lt;\/description&gt;\n    &lt;price&gt;28.71&lt;\/price&gt;\n    &lt;quantity&gt;3&lt;\/quantity&gt;\n    &lt;region&gt;SA&lt;\/region&gt;\n    &lt;ordertime&gt;2023-10-29 13:13:20.675&lt;\/ordertime&gt;\n&lt;\/order&gt;\n<\/code><\/pre>\n<p>Returning now to the <a href=\"https:\/\/github.com\/dalelane\/kafka-connect-stockprice-source\"><code class=\"codeterm\">StockPriceSourceConnector<\/code><\/a>, the <code class=\"codeterm\">XMLConverter<\/code> can also produce an embedded XSD schema for the stock price events as well.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/converter-stockprices.png?raw=true\" alt=\"diagram of the stock prices connector with an XML converter\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">StockPriceSourceConnector<\/code><br \/>Emits stock price update events<\/li>\n<li><code class=\"codeterm\">XmlConverter<\/code> <br \/>Serializes the stock price events into XML strings with an embedded XSD schema.<\/li>\n<\/ol>\n<p>The <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/stockprices-xml.yaml\">config for running this<\/a> looks like:<\/p>\n<pre class=\"listing\"><code>value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter\nvalue.converter.schemas.enable=true\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/stockprices-xml.yaml\">stockprices-xml.yaml<\/a><\/p>\n<p>Setting <code class=\"codeterm\">schemas.enable<\/code> to true with a <code class=\"codeterm\">XmlConverter<\/code> produces stock price events like:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;\n&lt;stockdata xmlns:xsi=&quot;http:\/\/www.w3.org\/2001\/XMLSchema-instance&quot; xsi:noNamespaceSchemaLocation=&quot;#connectSchema&quot;&gt;\n    &lt;xs:schema xmlns:xs=&quot;http:\/\/www.w3.org\/2001\/XMLSchema&quot; id=&quot;connectSchema&quot;&gt;\n        &lt;xs:element name=&quot;root&quot;&gt;\n            &lt;xs:complexType&gt;\n                &lt;xs:sequence&gt;\n                  &lt;xs:any maxOccurs=&quot;1&quot; minOccurs=&quot;0&quot; namespace=&quot;http:\/\/www.w3.org\/2001\/XMLSchema&quot; processContents=&quot;skip&quot;\/&gt;\n                  &lt;xs:element name=&quot;open&quot; type=&quot;xs:double&quot; \/&gt;\n                  &lt;xs:element name=&quot;high&quot; type=&quot;xs:double&quot; \/&gt;\n                  &lt;xs:element name=&quot;low&quot; type=&quot;xs:double&quot; \/&gt;\n                  &lt;xs:element name=&quot;close&quot; type=&quot;xs:double&quot; \/&gt;\n                  &lt;xs:element name=&quot;volume&quot; type=&quot;xs:integer&quot; \/&gt;\n                  &lt;xs:element name=&quot;timestamp&quot; type=&quot;xs:string&quot; \/&gt;\n                  &lt;xs:element name=&quot;datetime&quot; type=&quot;xs:string&quot; \/&gt;\n                &lt;\/xs:sequence&gt;\n            &lt;\/xs:complexType&gt;\n        &lt;\/xs:element&gt;\n    &lt;\/xs:schema&gt;\n\n    &lt;open&gt;142.3&lt;\/open&gt;\n    &lt;high&gt;142.3&lt;\/high&gt;\n    &lt;low&gt;142.3&lt;\/low&gt;\n    &lt;close&gt;142.3&lt;\/close&gt;\n    &lt;volume&gt;2&lt;\/volume&gt;\n    &lt;timestamp&gt;1698453420&lt;\/timestamp&gt;\n    &lt;datetime&gt;2023-10-27 19:37:00&lt;\/datetime&gt;\n&lt;\/stockdata&gt;\n<\/code><\/pre>\n<h3 class=\"usecasesection\" id=\"i-have-xml-data-in-external-systems-that-i-need-to-bring-into-kafka-in-a-kafka-friendly-format\">&quot;I have XML data in external systems that I need to bring into Kafka in a Kafka-friendly<sup>&#8224;<\/sup> format&quot;<\/h3>\n<p>&#8224; &#8211; <em>By &quot;Kafka-friendly&quot;, I mean something like Avro or JSON &#8211; although Kafka doesn&#8217;t care about what the bytes on a topic represent, there are more Kafka tools and applications that know how to process these formats than XML.<\/em><\/p>\n<p>Messages on MQ queues and topics are a good example of this, as these are frequently in XML.<\/p>\n<h4 class=\"examplesection\" id=\"transfer-xml-messages-from-mq-queues-into-kafka-topics-as-json-messages\">Transfer XML messages from MQ queues into Kafka topics as JSON messages<\/h4>\n<p>I can set up a Kafka Connect pipeline to bring XML MQ messages onto my Kafka topics as JSON strings.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/recordbuilder-source-json.png?raw=true\" alt=\"diagram of the MQ Source connector paired with a JSON converter\"\/><\/p>\n<ol>\n<li>MQ queue with XML messages<\/li>\n<li><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-source\"><code class=\"codeterm\">MQSourceConnector<\/code><\/a> paired with <code class=\"codeterm\">XmlMQRecordBuilder<\/code> <br \/>Parses the XML and outputs it as Connect structs<\/li>\n<li><code class=\"codeterm\">JsonConverter<\/code> <br \/>Serializes the MQ messages into JSON strings<\/li>\n<\/ol>\n<p>The <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-xml-to-json.yaml\">config for this<\/a> is more complex, but the important bits are:<\/p>\n<pre class=\"listing\"><code>mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder\nmq.record.builder.root.element.name=doc\nvalue.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-xml-to-json.yaml\">mq-xml-to-json.yaml<\/a><\/p>\n<p>With this config, I can PUT an MQ message like this:<\/p>\n<pre class=\"listing output\"><code>&lt;doc&gt;\n    &lt;something&gt;\n        &lt;message&gt;Hello World&lt;\/message&gt;\n        &lt;counts&gt;\n            &lt;first&gt;100&lt;\/first&gt;\n            &lt;second&gt;200&lt;\/second&gt;\n        &lt;\/counts&gt;\n    &lt;\/something&gt;\n&lt;\/doc&gt;\n<\/code><\/pre>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/mq-1.png?raw=true\" alt=\"screenshot of the MQ web interface\"\/><\/p>\n<p>The Connect pipeline will produce this to the Kafka topic as a JSON message like this:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;something&quot;: {\n        &quot;message&quot;: &quot;Hello World&quot;,\n        &quot;counts&quot;: {\n            &quot;first&quot;: 100,\n            &quot;second&quot;: 200\n        }\n    }\n}\n<\/code><\/pre>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/eventstreams-1.png?raw=true\" alt=\"screenshot of the Event Streams web interface\"\/><\/p>\n<h4 class=\"examplesection\" id=\"transfer-xml-messages-from-mq-queues-into-kafka-topics-as-avro-messages\">Transfer XML messages from MQ queues into Kafka topics as Avro messages<\/h4>\n<p>Alternatively, I can configure a Kafka Connect pipeline to bring XML MQ messages onto my Kafka topics as Avro-encoded messages.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/recordbuilder-source-avro.png?raw=true\" alt=\"diagram of the MQ Source connector paired with an Avro converter\"\/><\/p>\n<ol>\n<li>MQ queue with XML messages<\/li>\n<li><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-source\"><code class=\"codeterm\">MQSourceConnector<\/code><\/a> paired with <code class=\"codeterm\">XmlMQRecordBuilder<\/code><br \/>Parses the XML and outputs it as Connect structs<\/li>\n<li><code class=\"codeterm\">AvroConverter<\/code> <br \/>Serializes the MQ messages into binary-encoded Avro<\/li>\n<\/ol>\n<p>There are multiple AvroConverters to choose from, so I&#8217;ll show a couple of examples.<\/p>\n<p>If I&#8217;m using <br \/><strong><code class=\"codeterm\">io.apicurio.registry.utils.converter.AvroConverter<\/code><\/strong>, <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-xml-to-avro-apicurio.yaml\">configured to use an Apicurio schema registry, the config<\/a> looks like:<\/p>\n<pre class=\"listing\"><code>mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder\nmq.record.builder.root.element.name=simple\nmq.record.builder.xsd.schema.path=\/opt\/kafka\/external-configuration\/xml-schemas\/simple.xsd\nvalue.converter=io.apicurio.registry.utils.converter.AvroConverter\nvalue.converter.schemas.enable=true\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-xml-to-avro-apicurio.yaml\">mq-xml-to-avro-apicurio.yaml<\/a><\/p>\n<p>If I PUT a message like this on my MQ queue:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;\n&lt;simple&gt;\n    &lt;message&gt;Hello World&lt;\/message&gt;\n    &lt;count&gt;123&lt;\/count&gt;\n&lt;\/simple&gt;\n<\/code><\/pre>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/mq-2.png?raw=true\" alt=\"screenshot of the MQ web interface\"\/><\/p>\n<p>The Connect pipeline will produce this to the Kafka topic as an Avro message like this:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/eventstreams-2.png?raw=true\" alt=\"screenshot of the Event Streams web interface\"\/><\/p>\n<p>And the AvroConverter will register an Avro schema based on the XSD schema in the schema registry:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/eventstreams-2-schema.png?raw=true\" alt=\"screenshot of the schema registry\"\/><\/p>\n<p>Alternatively, if I&#8217;m using <br \/><strong><code class=\"codeterm\">io.confluent.connect.avro.AvroConverter<\/code><\/strong>, (still <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-xml-to-avro-confluent.yaml\">configured to use an Apicurio schema registry), the config<\/a> looks like:<\/p>\n<pre class=\"listing\"><code>mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder\nmq.record.builder.root.element.name=ordermessage\nmq.record.builder.xsd.schema.path=\/opt\/kafka\/external-configuration\/xml-schemas\/mq-messages.xsd\nvalue.converter=io.confluent.connect.avro.AvroConverter\nvalue.converter.schemas.enable=true\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-xml-to-avro-confluent.yaml\">mq-xml-to-avro-confluent.yaml<\/a><\/p>\n<p>If I PUT a message like this on my MQ queue:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot; ?&gt;\n&lt;ordermessage&gt;\n    &lt;customer&gt;\n        &lt;name&gt;Helen Velazquez&lt;\/name&gt;\n        &lt;phone type=&quot;landline&quot; number=&quot;0911 910 5491&quot;\/&gt;\n        &lt;email&gt;mus.donec.dignissim@yahoo.ca&lt;\/email&gt;\n        &lt;address&gt;3249 Hendrerit Av.&lt;\/address&gt;\n        &lt;postalZip&gt;F2 1IX&lt;\/postalZip&gt;\n        &lt;region&gt;Dunbartonshire&lt;\/region&gt;\n    &lt;\/customer&gt;\n    &lt;product&gt;\n        &lt;brand&gt;Acme Inc&lt;\/brand&gt;\n        &lt;item&gt;Awesome-ivator&lt;\/item&gt;\n        &lt;quantity&gt;1&lt;\/quantity&gt;\n    &lt;\/product&gt;\n    &lt;product&gt;\n        &lt;brand&gt;Globex&lt;\/brand&gt;\n        &lt;item&gt;Widget&lt;\/item&gt;\n        &lt;quantity&gt;2&lt;\/quantity&gt;\n    &lt;\/product&gt;\n    &lt;order&gt;\n        &lt;date&gt;2023-11-05 22:11:00&lt;\/date&gt;\n    &lt;\/order&gt;\n&lt;\/ordermessage&gt;\n<\/code><\/pre>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/mq-3.png?raw=true\" alt=\"screenshot of the MQ web interface\"\/><\/p>\n<p>The Connect pipeline produces this to the Kafka topic as an Avro message:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/eventstreams-3.png?raw=true\" alt=\"screenshot of the Event Streams web interface\"\/><\/p>\n<p>And the AvroConverter will register an Avro schema based on the XSD schema in the schema registry:<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/eventstreams-3-schema.png?raw=true\" alt=\"screenshot of the schema registry\"\/><\/p>\n<h4 class=\"examplesection\" id=\"capture-xml-events-from-a-web-service-and-bring-them-into-kafka-topics-as-json-messages\">Capture XML events from a web service and bring them into Kafka topics as JSON messages<\/h4>\n<p>Not all source connectors produce structured records. Some connectors will only produce a string. In such cases, it is necessary to use an <code class=\"codeterm\">XmlTransformation<\/code> to create structured Connect records from the string.<\/p>\n<p>My example for this is a source connector that makes HTTP calls to an XML web service which it returns as a string.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/transformer-source.png?raw=true\" alt=\"diagram of a connect pipeline with an XML transformation\"\/><\/p>\n<ol>\n<li><code class=\"codeterm\">CamelWeatherSourceConnector<\/code> <br \/>Polls a web service for a weather report event which is returned as an XML string<\/li>\n<li><code class=\"codeterm\">XmlTransformation<\/code> <br \/>Parses the XML string and outputs it as a structured object<\/li>\n<li>Additional transformations can be applied, now that the pipeline has structured objects<\/li>\n<li><code class=\"codeterm\">JsonConverter<\/code><br \/>Serializes the weather event objects into JSON strings<\/li>\n<\/ol>\n<p>The <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/weather-xml-to-json.yaml\">config for this<\/a> needs to include details for the weather web service, but the XML-specific elements are:<\/p>\n<pre class=\"listing\"><code>transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation\ntransforms.xmlconvert.converter.type=value\ntransforms.xmlconvert.root.element.name=current\nvalue.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/weather-xml-to-json.yaml\">weather-xml-to-json.yaml<\/a><\/p>\n<p>This pipeline means that, even though the source connector returns an unstructured string such as:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;\n&lt;current&gt;\n    &lt;visibility value=&quot;10000&quot;&gt;&lt;\/visibility&gt;\n    &lt;precipitation mode=&quot;no&quot;&gt;&lt;\/precipitation&gt;\n    &lt;city id=&quot;2652491&quot; name=&quot;Compton&quot;&gt;\n        &lt;coord lon=&quot;-1.3989&quot; lat=&quot;51.0267&quot;&gt;&lt;\/coord&gt;\n        &lt;country&gt;GB&lt;\/country&gt;\n        &lt;timezone&gt;0&lt;\/timezone&gt;\n        &lt;sun\n           rise=&quot;2023-10-29T06:51:47&quot;\n           set=&quot;2023-10-29T16:46:41&quot;&gt;&lt;\/sun&gt;\n    &lt;\/city&gt;\n    &lt;temperature\n        value=&quot;285.6&quot;\n        min=&quot;283.83&quot;\n        max=&quot;286.84&quot;\n        unit=&quot;kelvin&quot;&gt;&lt;\/temperature&gt;\n    &lt;weather\n        number=&quot;802&quot;\n        value=&quot;scattered clouds&quot;\n        icon=&quot;03d&quot;&gt;&lt;\/weather&gt;\n    &lt;humidity value=&quot;89&quot; unit=&quot;%&quot;&gt;&lt;\/humidity&gt;\n    &lt;pressure value=&quot;987&quot; unit=&quot;hPa&quot;&gt;&lt;\/pressure&gt;\n    &lt;feels_like\n        value=&quot;285.22&quot;\n        unit=&quot;kelvin&quot;&gt;&lt;\/feels_like&gt;\n    &lt;wind&gt;\n        &lt;speed value=&quot;4.12&quot; unit=&quot;m\/s&quot; name=&quot;Gentle Breeze&quot;&gt;&lt;\/speed&gt;\n        &lt;gusts&gt;&lt;\/gusts&gt;\n        &lt;direction value=&quot;210&quot; code=&quot;SSW&quot; name=&quot;South-southwest&quot;&gt;&lt;\/direction&gt;\n    &lt;\/wind&gt;\n    &lt;clouds value=&quot;40&quot; name=&quot;scattered clouds&quot;&gt;&lt;\/clouds&gt;\n&lt;lastupdate value=&quot;2023-10-29T14:39:13&quot;&gt;&lt;\/lastupdate&gt;\n&lt;\/current&gt;\n<\/code><\/pre>\n<p>The messages produced to my Kafka topic look like:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;precipitation&quot;: { &quot;mode&quot;: &quot;no&quot; },\n    &quot;visibility&quot;: { &quot;value&quot;: 10000 },\n    &quot;city&quot;: {\n        &quot;country&quot;: &quot;GB&quot;,\n        &quot;coord&quot;: { &quot;lon&quot;: -1.3989, &quot;lat&quot;: 51.0267 },\n        &quot;timezone&quot;: 0,\n        &quot;name&quot;: &quot;Compton&quot;,\n        &quot;id&quot;: 2652491,\n        &quot;sun&quot;: {\n            &quot;set&quot;: &quot;2023-10-29T16:46:41&quot;,\n            &quot;rise&quot;: &quot;2023-10-29T06:51:47&quot;\n        }\n    },\n    &quot;temperature&quot;: {\n        &quot;unit&quot;: &quot;kelvin&quot;,\n        &quot;min&quot;: 283.83,\n        &quot;max&quot;: 286.84,\n        &quot;value&quot;: 285.6\n    },\n    &quot;weather&quot;: {\n        &quot;number&quot;: 802,\n        &quot;icon&quot;: 3,\n        &quot;value&quot;: &quot;scattered clouds&quot;\n    },\n    &quot;humidity&quot;: {\n        &quot;unit&quot;: &quot;%&quot;,\n        &quot;value&quot;: 89\n    },\n    &quot;pressure&quot;: {\n        &quot;unit&quot;: &quot;hPa&quot;,\n        &quot;value&quot;: 987\n    },\n    &quot;clouds&quot;: {\n        &quot;name&quot;: &quot;scattered clouds&quot;,\n        &quot;value&quot;: 40\n    },\n    &quot;lastupdate&quot;: {\n        &quot;value&quot;: &quot;2023-10-29T14:37:13&quot;\n    },\n    &quot;feels_like&quot;: {\n        &quot;unit&quot;: &quot;kelvin&quot;,\n        &quot;value&quot;: 285.22\n    },\n    &quot;wind&quot;: {\n        &quot;gusts&quot;: &quot;&quot;,\n        &quot;speed&quot;: {\n            &quot;unit&quot;: &quot;m\/s&quot;,\n            &quot;name&quot;: &quot;Gentle Breeze&quot;,\n            &quot;value&quot;: 4.12\n        },\n        &quot;direction&quot;: {\n            &quot;code&quot;: &quot;SSW&quot;,\n            &quot;name&quot;: &quot;South-southwest&quot;,\n            &quot;value&quot;: 210\n        }\n    }\n}\n<\/code><\/pre>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/screenshots\/eventstreams-weather.png?raw=true\" alt=\"screenshot of the Event Streams web interface\"\/><\/p>\n<h3 class=\"usecasesection\" id=\"i-have-kafka-messages-in-a-kafka-friendly-format-that-i-need-to-send-to-external-systems-as-xml\">&quot;I have Kafka messages in a Kafka-friendly<sup>&#8224;<\/sup> format that I need to send to external systems as XML&quot;<\/h3>\n<p>&#8224; &#8211; <em>Again, by &quot;Kafka-friendly&quot;, I&#8217;m talking about formats like Avro or JSON &#8211; although Kafka doesn&#8217;t care about what the bytes on a topic represent, there are more Kafka tools and applications that know how to process these formats than XML.<\/em><\/p>\n<p>As before, MQ is a good example of a system that is likely to expect XML data.<\/p>\n<h4 class=\"examplesection\" id=\"transfer-json-messages-from-your-kafka-topics-to-mq-queues-as-xml\">Transfer JSON messages from your Kafka topics to MQ queues as XML documents<\/h4>\n<p>I can set up a Kafka Connect pipeline to send JSON messages on my Kafka topics to an MQ queue as XML messages.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/recordbuilder-sink.png?raw=true\" alt=\"diagram of a JSON to XML sink pipeline\"\/><\/p>\n<ol>\n<li>Kafka topic with JSON messages<\/li>\n<li><code class=\"codeterm\">JsonConverter<\/code> <br \/>Deserializes the JSON strings into Java objects<\/li>\n<li><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-sink\"><code class=\"codeterm\">MQSinkConnector<\/code><\/a> paired with <code class=\"codeterm\">XmlConverter<\/code><br \/>Turns the objects into XML strings<\/li>\n<li>XML messages are PUT to the MQ queue<\/li>\n<\/ol>\n<p>The relevant section of <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-json-to-xml.yaml\">the config for this<\/a> is:<\/p>\n<pre class=\"listing\"><code>mq.message.builder=com.ibm.eventstreams.connect.mqsink.builders.ConverterMessageBuilder\nmq.message.builder.value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter\nmq.message.builder.value.converter.root.element.name=msg\nvalue.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/mq-json-to-xml.yaml\">mq-json-to-xml.yaml<\/a><\/p>\n<p>With a sink pipeline configured in such a way, JSON messages on the Kafka topic like this:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;test&quot;: {\n        &quot;numbers&quot;: [ 10, 20, 30 ],\n        &quot;letters&quot;: [ &quot;A&quot;, &quot;B&quot;, &quot;C&quot; ],\n        &quot;message&quot;: &quot;Hello World&quot;\n    },\n    &quot;creative&quot;: false\n}\n<\/code><\/pre>\n<p>will be PUT to MQ as XML messages like this:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot; standalone=&quot;no&quot;?&gt;\n&lt;msg&gt;\n    &lt;test&gt;\n        &lt;numbers&gt;10&lt;\/numbers&gt;\n        &lt;numbers&gt;20&lt;\/numbers&gt;\n        &lt;numbers&gt;30&lt;\/numbers&gt;\n        &lt;message&gt;Hello World&lt;\/message&gt;\n        &lt;letters&gt;A&lt;\/letters&gt;\n        &lt;letters&gt;B&lt;\/letters&gt;\n        &lt;letters&gt;C&lt;\/letters&gt;\n    &lt;\/test&gt;\n    &lt;creative&gt;false&lt;\/creative&gt;\n&lt;\/msg&gt;\n<\/code><\/pre>\n<h4 class=\"examplesection\" id=\"submit-json-messages-from-your-kafka-topics-to-an-xml-web-service-using-http-posts\">Submit JSON messages from your Kafka topics to an XML web service using HTTP posts<\/h4>\n<p>Finally, some sink connectors can only send strings to an external system. In such cases, an <code class=\"codeterm\">XMLTransformation<\/code> is useful for converting structured Connect objects to an XML string as the last transformation in a sink pipeline.<\/p>\n<p>To illustrate this, I have a Kafka topic containing JSON messages that I will submit using HTTP POST&#8217;s to an XML web service.<\/p>\n<p><img decoding=\"async\" class=\"diagram\" src=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/diagrams\/transformer-sink.png?raw=true\" alt=\"diagram of an HTTP sink pipeline\"\/><\/p>\n<ol>\n<li>Kafka topic with JSON messages<\/li>\n<li><code class=\"codeterm\">JsonConverter<\/code> <br \/>Deserializes the JSON strings into Java objects<\/li>\n<li>Transform the Connect objects to match the request payload expectation of the web service API<\/li>\n<li><code class=\"codeterm\">XMLTransformation<\/code><br \/>Converts the modified Connect object into an XML string<\/li>\n<li><code class=\"codeterm\">HttpSinkConnector<\/code><br \/>Submits the XML string as the payload in an HTTP request<\/li>\n<\/ol>\n<p>Most of <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/http-sink.yaml\">the config for a sink pipeline like this<\/a> is in defining the transformations to get the objects into the desired shape. In <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/http-sink.yaml\">my example<\/a>, I&#8217;m removing one of the properties, inserting a new one, and renaming one of them.<\/p>\n<p>The config that handles converting the JSON messages into XML is:<\/p>\n<pre class=\"listing\"><code>transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation\ntransforms.xmlconvert.converter.type=value\ntransforms.xmlconvert.root.element.name=request\nvalue.converter=org.apache.kafka.connect.json.JsonConverter\nvalue.converter.schemas.enable=false\n<\/code><\/pre>\n<p class=\"configlink\"><a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\/blob\/0.1.0\/examples\/eventstreams\/http-sink.yaml\">http-sink.yaml<\/a><\/p>\n<p>This pipeline will consume JSON messages from a Kafka topic like this:<\/p>\n<pre class=\"listing output\"><code>{\n    &quot;product&quot;: {\n        &quot;id&quot;: &quot;ABCD1234&quot;,\n        &quot;type&quot;: &quot;Something impressive&quot;\n    },\n    &quot;value&quot;: 12.99,\n    &quot;customer&quot;: {\n        &quot;id&quot;: &quot;XXXXYYYY&quot;,\n        &quot;name&quot;: &quot;Joe Bloggs&quot;\n    }\n}\n<\/code><\/pre>\n<p>And use them to make HTTP POST requests like this:<\/p>\n<pre class=\"listing output\"><code>&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot; standalone=&quot;no&quot;?&gt;\n&lt;request&gt;\n    &lt;product&gt;\n        &lt;id&gt;ABCD1234&lt;\/id&gt;\n        &lt;type&gt;Something impressive&lt;\/type&gt;\n    &lt;\/product&gt;\n    &lt;cost&gt;12.99&lt;\/cost&gt;\n    &lt;origin&gt;demo&lt;\/origin&gt;\n&lt;\/request&gt;\n<\/code><\/pre>\n<h2 id=\"summary\">Summary<\/h2>\n<p>To conclude, the eight examples outlined above show a few examples of the places within a Kafka Connect pipeline where you could use <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\">our new XML plugin<\/a>. I&#8217;m sure there are other ways to use it as well, but hopefully this gives you some ideas for where to start.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this tutorial, I&#8217;ll share examples of how to process XML data at various points in a Kafka Connect pipeline, using a new plugin from IBM Event Streams. You can assemble a Kafka Connect pipeline in a huge number of ways, so I&#8217;m not going to attempt an exhaustive list here. Instead, I&#8217;ve come up [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,583,584],"class_list":["post-5035","post","type-post","status-publish","format-standard","hentry","category-code","tag-apachekafka","tag-ibmeventstreams","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5035","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5035"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5035\/revisions"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5035"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5035"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5035"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}