Processing XML with Kafka Connect

In this tutorial, I’ll share examples of how to process XML data at various points in a Kafka Connect pipeline, using a new plugin from IBM Event Streams.

You can assemble a Kafka Connect pipeline in a huge number of ways, so I’m not going to attempt an exhaustive list here. Instead, I’ve come up with eight examples that are illustrative of the sort of use cases you can satisfy.

I’ll summarise and link to my different examples here, so you can jump straight to the one that sounds the closest to your use case:

Background

I’ll start with a recap of the components that make up a Kafka Connect pipeline. This will make it easier for me to refer to different pieces as I go through the examples. If you already know how Connect works, you can skip this bit.

Source pipeline

This is what a Connect source pipeline does:

diagram of a Kafka Connect source pipeline

  1. The Connector connects to some external system, retrieves some data, and passes it on.
    Ideally, this is in a structured form: such as Connect’s native structured form (Struct), or a Java structure (such as a Map).
  2. Optionally, a series of Transformation plugins perform operations, such as filters and transforms, on the Connect objects.
    Transformations that operate on Connect structs are de-coupled from Connectors because a Connect Struct is a generic structure, agnostic of any individual Connector.
  3. A Converter converts the Connect objects to a serialized format for producing to Kafka topics.
    For example, this could be Avro, JSON, Protobuf, etc.

Sink pipeline

This is what a Connect sink pipeline does:

diagram of a Kafka Connect sink pipeline

  1. The bytes of a Kafka message are given to a Converter to be deserialized into Connect Structs.
  2. Optionally, a series of Transformation plugins perform operations, such as filters and transforms, on the Connect objects.
    Transformations that operate on Connect structs are de-coupled from Converters because a Connect Struct is a generic structure, agnostic of any individual Converter.
  3. A Connector submits the Connect structs to some external system.

Examples

The following examples illustrate the different places to process XML data in a Kafka Connect pipeline. I’ll link to more detailed configuration, and include the XML-specific bits in the post here.

"I need Connectors to put data on my Kafka topics in XML format"

Example: Produce events from your source connectors in XML format

I’ll start with DatagenSourceConnector – a datagen connector I wrote for the Event Processing tutorials.

I’ve been using it with a JSONConverter so far:

diagram of the datagen connector with a JSON converter

  1. DatagenSourceConnector
    Generates events about a fictional clothes retailer
  2. JsonConverter
    Serializes the clothing retail events into JSON strings

My full config for how I’ve been running it is here but the interesting bit looks like:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Using a JsonConverter like this produces events such as:

{
    "id": "68579620-fccc-4acb-bfcc-3686919346aa",
    "customer": "Taren Reichert",
    "customerid": "ef8db9c1-c685-4600-be30-bbd1fa2f7efd",
    "description": "XS Retro Boyfriend Jeans",
    "price": 16.41,
    "quantity": 6,
    "region": "APAC",
    "ordertime": "2023-10-29 13:20:17.389"
}

Replacing JSONConverter with the XMLConverter let me produce these events in XML instead.

diagram of the datagen connector with an XML converter

  1. DatagenSourceConnector
    Generates events about a fictional clothes retailer
  2. XmlConverter
    Serializes the clothing retailer events into XML strings

The full config for doing this is here but the relevant bit is:

value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=false

Using an XmlConverter, with the same connector, produces events like:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<order>
    <id>68579620-fccc-4acb-bfcc-3686919346aa</id>
    <customer>Taren Reichert</customer>
    <customerid>ef8db9c1-c685-4600-be30-bbd1fa2f7efd</customerid>
    <description>XS Retro Boyfriend Jeans</description>
    <price>16.41</price>
    <quantity>6</quantity>
    <region>APAC</region>
    <ordertime>2023-10-29 13:20:17.389</ordertime>
</order>

Here is another example of using XMLConverter in a source pipeline.

StockPriceSourceConnector emits stock price updates. When I’ve used it with a JsonConverter in the past, it produced events like this:

{
    "open"      : 1.23,
    "high"      : 1.23,
    "low"       : 1.22,
    "close"     : 1.23,
    "timestamp" : 1655757900,
    "datetime"  : "2022-06-20 20:45:00"
}

Replacing JSONConverter with XMLConverter lets me produce these stock price events in XML instead.

diagram of the stock prices connector with an XML converter

  1. StockPriceSourceConnector
    Emits stock price update events
  2. XmlConverter
    Serializes the stock price events into XML strings

The interesting bit of the config for this is:

value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=false

That produces events like:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<stockdata>
    <open>160.65</open>
    <high>160.75</high>
    <low>160.65</low>
    <close>160.75</close>
    <volume>10</volume>
    <timestamp>1701478140</timestamp>
    <datetime>2023-12-01 19:49:00</datetime>
</stockdata>

Produce events from your source connectors in XML, and generate an XSD schema to go with them

When Connect records include information about the structure of the events, a Converter can also be configured to output these as schemas. The type of schema will vary depending on the type of Converter.

Returning to DatagenSourceConnector, and again using JSONConverter as a comparison:

diagram of the datagen connector with a JSON converter

  1. DatagenSourceConnector
    Generates events about a fictional clothes retailer
  2. JsonConverter
    Serializes the clothing retailer events into JSON strings, with an embedded JSON Schema

The relevant part of the config for this looks like:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Setting schemas.enable to true with a JsonConverter produces events like:

{
    "schema": {
        "type": "struct",
        "fields": [
            { "type": "string", "optional": false, "field": "id" },
            { "type": "string", "optional": false, "field": "customer" },
            { "type": "string", "optional": false, "field": "customerid" },
            { "type": "string", "optional": false, "field": "description" },
            { "type": "double", "optional": false, "field": "price" },
            { "type": "int32",  "optional": false, "field": "quantity" },
            { "type": "string", "optional": false, "field": "region" },
            { "type": "string", "optional": false, "field": "ordertime" }
        ],
        "optional": false,
        "name": "order"
    },
    "payload": {
        "id": "55031d01-1bea-4f3b-9b29-12a968d8230e",
        "customer": "Gabriel Stracke",
        "customerid": "7a75009c-5781-4074-9bf0-20dee537dc9d",
        "description": "XXL Stonewashed Capri Jeans",
        "price": 28.71,
        "quantity": 3,
        "region": "SA",
        "ordertime": "2023-10-29 13:13:20.675"
    }
}

Replacing JSONConverter with the XMLConverter let me produce these events in XML instead.

diagram of the datagen connector with an XML converter

  1. DatagenSourceConnector
    Generates events about a fictional clothes retailer
  2. XmlConverter
    Serializes the clothing retailer events into XML strings with an embedded XSD schema.

The XML-specific bit of the config for this looks like:

value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=true

Setting schemas.enable to true with a XmlConverter produces events like:

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="#connectSchema">
    <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" id="connectSchema">
        <xs:element name="order">
            <xs:complexType>
                <xs:sequence>
                  <xs:any maxOccurs="1" minOccurs="0" namespace="http://www.w3.org/2001/XMLSchema" processContents="skip"/>
                  <xs:element name="id" type="xs:string" />
                  <xs:element name="customer" type="xs:string" />
                  <xs:element name="customerid" type="xs:string" />
                  <xs:element name="description" type="xs:string" />
                  <xs:element name="price" type="xs:double" />
                  <xs:element name="quantity" type="xs:integer" />
                  <xs:element name="region" type="xs:string" />
                  <xs:element name="ordertime" type="xs:string" />
                </xs:sequence>
            </xs:complexType>
        </xs:element>
    </xs:schema>

    <id>55031d01-1bea-4f3b-9b29-12a968d8230e</id>
    <customer>Gabriel Stracke</customer>
    <customerid>7a75009c-5781-4074-9bf0-20dee537dc9d</customerid>
    <description>XXL Stonewashed Capri Jeans</description>
    <price>28.71</price>
    <quantity>3</quantity>
    <region>SA</region>
    <ordertime>2023-10-29 13:13:20.675</ordertime>
</order>

Returning now to the StockPriceSourceConnector, the XMLConverter can also produce an embedded XSD schema for the stock price events as well.

diagram of the stock prices connector with an XML converter

  1. StockPriceSourceConnector
    Emits stock price update events
  2. XmlConverter
    Serializes the stock price events into XML strings with an embedded XSD schema.

The config for running this looks like:

value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=true

Setting schemas.enable to true with a XmlConverter produces stock price events like:

<?xml version="1.0" encoding="UTF-8"?>
<stockdata xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="#connectSchema">
    <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" id="connectSchema">
        <xs:element name="root">
            <xs:complexType>
                <xs:sequence>
                  <xs:any maxOccurs="1" minOccurs="0" namespace="http://www.w3.org/2001/XMLSchema" processContents="skip"/>
                  <xs:element name="open" type="xs:double" />
                  <xs:element name="high" type="xs:double" />
                  <xs:element name="low" type="xs:double" />
                  <xs:element name="close" type="xs:double" />
                  <xs:element name="volume" type="xs:integer" />
                  <xs:element name="timestamp" type="xs:string" />
                  <xs:element name="datetime" type="xs:string" />
                </xs:sequence>
            </xs:complexType>
        </xs:element>
    </xs:schema>

    <open>142.3</open>
    <high>142.3</high>
    <low>142.3</low>
    <close>142.3</close>
    <volume>2</volume>
    <timestamp>1698453420</timestamp>
    <datetime>2023-10-27 19:37:00</datetime>
</stockdata>

"I have XML data in external systems that I need to bring into Kafka in a Kafka-friendly format"

† – By "Kafka-friendly", I mean something like Avro or JSON – although Kafka doesn’t care about what the bytes on a topic represent, there are more Kafka tools and applications that know how to process these formats than XML.

Messages on MQ queues and topics are a good example of this, as these are frequently in XML.

Transfer XML messages from MQ queues into Kafka topics as JSON messages

I can set up a Kafka Connect pipeline to bring XML MQ messages onto my Kafka topics as JSON strings.

diagram of the MQ Source connector paired with a JSON converter

  1. MQ queue with XML messages
  2. MQSourceConnector paired with XmlMQRecordBuilder
    Parses the XML and outputs it as Connect structs
  3. JsonConverter
    Serializes the MQ messages into JSON strings

The config for this is more complex, but the important bits are:

mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
mq.record.builder.root.element.name=doc
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

With this config, I can PUT an MQ message like this:

<doc>
    <something>
        <message>Hello World</message>
        <counts>
            <first>100</first>
            <second>200</second>
        </counts>
    </something>
</doc>

screenshot of the MQ web interface

The Connect pipeline will produce this to the Kafka topic as a JSON message like this:

{
    "something": {
        "message": "Hello World",
        "counts": {
            "first": 100,
            "second": 200
        }
    }
}

screenshot of the Event Streams web interface

Transfer XML messages from MQ queues into Kafka topics as Avro messages

Alternatively, I can configure a Kafka Connect pipeline to bring XML MQ messages onto my Kafka topics as Avro-encoded messages.

diagram of the MQ Source connector paired with an Avro converter

  1. MQ queue with XML messages
  2. MQSourceConnector paired with XmlMQRecordBuilder
    Parses the XML and outputs it as Connect structs
  3. AvroConverter
    Serializes the MQ messages into binary-encoded Avro

There are multiple AvroConverters to choose from, so I’ll show a couple of examples.

If I’m using
io.apicurio.registry.utils.converter.AvroConverter, configured to use an Apicurio schema registry, the config looks like:

mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
mq.record.builder.root.element.name=simple
mq.record.builder.xsd.schema.path=/opt/kafka/external-configuration/xml-schemas/simple.xsd
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.schemas.enable=true

If I PUT a message like this on my MQ queue:

<?xml version="1.0" encoding="UTF-8"?>
<simple>
    <message>Hello World</message>
    <count>123</count>
</simple>

screenshot of the MQ web interface

The Connect pipeline will produce this to the Kafka topic as an Avro message like this:

screenshot of the Event Streams web interface

And the AvroConverter will register an Avro schema based on the XSD schema in the schema registry:

screenshot of the schema registry

Alternatively, if I’m using
io.confluent.connect.avro.AvroConverter, (still configured to use an Apicurio schema registry), the config looks like:

mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
mq.record.builder.root.element.name=ordermessage
mq.record.builder.xsd.schema.path=/opt/kafka/external-configuration/xml-schemas/mq-messages.xsd
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true

If I PUT a message like this on my MQ queue:

<?xml version="1.0" encoding="UTF-8" ?>
<ordermessage>
    <customer>
        <name>Helen Velazquez</name>
        <phone type="landline" number="0911 910 5491"/>
        <email>mus.donec.dignissim@yahoo.ca</email>
        <address>3249 Hendrerit Av.</address>
        <postalZip>F2 1IX</postalZip>
        <region>Dunbartonshire</region>
    </customer>
    <product>
        <brand>Acme Inc</brand>
        <item>Awesome-ivator</item>
        <quantity>1</quantity>
    </product>
    <product>
        <brand>Globex</brand>
        <item>Widget</item>
        <quantity>2</quantity>
    </product>
    <order>
        <date>2023-11-05 22:11:00</date>
    </order>
</ordermessage>

screenshot of the MQ web interface

The Connect pipeline produces this to the Kafka topic as an Avro message:

screenshot of the Event Streams web interface

And the AvroConverter will register an Avro schema based on the XSD schema in the schema registry:

screenshot of the schema registry

Capture XML events from a web service and bring them into Kafka topics as JSON messages

Not all source connectors produce structured records. Some connectors will only produce a string. In such cases, it is necessary to use an XmlTransformation to create structured Connect records from the string.

My example for this is a source connector that makes HTTP calls to an XML web service which it returns as a string.

diagram of a connect pipeline with an XML transformation

  1. CamelWeatherSourceConnector
    Polls a web service for a weather report event which is returned as an XML string
  2. XmlTransformation
    Parses the XML string and outputs it as a structured object
  3. Additional transformations can be applied, now that the pipeline has structured objects
  4. JsonConverter
    Serializes the weather event objects into JSON strings

The config for this needs to include details for the weather web service, but the XML-specific elements are:

transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation
transforms.xmlconvert.converter.type=value
transforms.xmlconvert.root.element.name=current
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

This pipeline means that, even though the source connector returns an unstructured string such as:

<?xml version="1.0" encoding="UTF-8"?>
<current>
    <visibility value="10000"></visibility>
    <precipitation mode="no"></precipitation>
    <city id="2652491" name="Compton">
        <coord lon="-1.3989" lat="51.0267"></coord>
        <country>GB</country>
        <timezone>0</timezone>
        <sun
           rise="2023-10-29T06:51:47"
           set="2023-10-29T16:46:41"></sun>
    </city>
    <temperature
        value="285.6"
        min="283.83"
        max="286.84"
        unit="kelvin"></temperature>
    <weather
        number="802"
        value="scattered clouds"
        icon="03d"></weather>
    <humidity value="89" unit="%"></humidity>
    <pressure value="987" unit="hPa"></pressure>
    <feels_like
        value="285.22"
        unit="kelvin"></feels_like>
    <wind>
        <speed value="4.12" unit="m/s" name="Gentle Breeze"></speed>
        <gusts></gusts>
        <direction value="210" code="SSW" name="South-southwest"></direction>
    </wind>
    <clouds value="40" name="scattered clouds"></clouds>
<lastupdate value="2023-10-29T14:39:13"></lastupdate>
</current>

The messages produced to my Kafka topic look like:

{
    "precipitation": { "mode": "no" },
    "visibility": { "value": 10000 },
    "city": {
        "country": "GB",
        "coord": { "lon": -1.3989, "lat": 51.0267 },
        "timezone": 0,
        "name": "Compton",
        "id": 2652491,
        "sun": {
            "set": "2023-10-29T16:46:41",
            "rise": "2023-10-29T06:51:47"
        }
    },
    "temperature": {
        "unit": "kelvin",
        "min": 283.83,
        "max": 286.84,
        "value": 285.6
    },
    "weather": {
        "number": 802,
        "icon": 3,
        "value": "scattered clouds"
    },
    "humidity": {
        "unit": "%",
        "value": 89
    },
    "pressure": {
        "unit": "hPa",
        "value": 987
    },
    "clouds": {
        "name": "scattered clouds",
        "value": 40
    },
    "lastupdate": {
        "value": "2023-10-29T14:37:13"
    },
    "feels_like": {
        "unit": "kelvin",
        "value": 285.22
    },
    "wind": {
        "gusts": "",
        "speed": {
            "unit": "m/s",
            "name": "Gentle Breeze",
            "value": 4.12
        },
        "direction": {
            "code": "SSW",
            "name": "South-southwest",
            "value": 210
        }
    }
}

screenshot of the Event Streams web interface

"I have Kafka messages in a Kafka-friendly format that I need to send to external systems as XML"

† – Again, by "Kafka-friendly", I’m talking about formats like Avro or JSON – although Kafka doesn’t care about what the bytes on a topic represent, there are more Kafka tools and applications that know how to process these formats than XML.

As before, MQ is a good example of a system that is likely to expect XML data.

Transfer JSON messages from your Kafka topics to MQ queues as XML documents

I can set up a Kafka Connect pipeline to send JSON messages on my Kafka topics to an MQ queue as XML messages.

diagram of a JSON to XML sink pipeline

  1. Kafka topic with JSON messages
  2. JsonConverter
    Deserializes the JSON strings into Java objects
  3. MQSinkConnector paired with XmlConverter
    Turns the objects into XML strings
  4. XML messages are PUT to the MQ queue

The relevant section of the config for this is:

mq.message.builder=com.ibm.eventstreams.connect.mqsink.builders.ConverterMessageBuilder
mq.message.builder.value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
mq.message.builder.value.converter.root.element.name=msg
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

With a sink pipeline configured in such a way, JSON messages on the Kafka topic like this:

{
    "test": {
        "numbers": [ 10, 20, 30 ],
        "letters": [ "A", "B", "C" ],
        "message": "Hello World"
    },
    "creative": false
}

will be PUT to MQ as XML messages like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<msg>
    <test>
        <numbers>10</numbers>
        <numbers>20</numbers>
        <numbers>30</numbers>
        <message>Hello World</message>
        <letters>A</letters>
        <letters>B</letters>
        <letters>C</letters>
    </test>
    <creative>false</creative>
</msg>

Submit JSON messages from your Kafka topics to an XML web service using HTTP posts

Finally, some sink connectors can only send strings to an external system. In such cases, an XMLTransformation is useful for converting structured Connect objects to an XML string as the last transformation in a sink pipeline.

To illustrate this, I have a Kafka topic containing JSON messages that I will submit using HTTP POST’s to an XML web service.

diagram of an HTTP sink pipeline

  1. Kafka topic with JSON messages
  2. JsonConverter
    Deserializes the JSON strings into Java objects
  3. Transform the Connect objects to match the request payload expectation of the web service API
  4. XMLTransformation
    Converts the modified Connect object into an XML string
  5. HttpSinkConnector
    Submits the XML string as the payload in an HTTP request

Most of the config for a sink pipeline like this is in defining the transformations to get the objects into the desired shape. In my example, I’m removing one of the properties, inserting a new one, and renaming one of them.

The config that handles converting the JSON messages into XML is:

transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation
transforms.xmlconvert.converter.type=value
transforms.xmlconvert.root.element.name=request
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

This pipeline will consume JSON messages from a Kafka topic like this:

{
    "product": {
        "id": "ABCD1234",
        "type": "Something impressive"
    },
    "value": 12.99,
    "customer": {
        "id": "XXXXYYYY",
        "name": "Joe Bloggs"
    }
}

And use them to make HTTP POST requests like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<request>
    <product>
        <id>ABCD1234</id>
        <type>Something impressive</type>
    </product>
    <cost>12.99</cost>
    <origin>demo</origin>
</request>

Summary

To conclude, the eight examples outlined above show a few examples of the places within a Kafka Connect pipeline where you could use our new XML plugin. I’m sure there are other ways to use it as well, but hopefully this gives you some ideas for where to start.

Tags: , ,

Leave a Reply