In this tutorial, I’ll share examples of how to process XML data at various points in a Kafka Connect pipeline, using a new plugin from IBM Event Streams.
You can assemble a Kafka Connect pipeline in a huge number of ways, so I’m not going to attempt an exhaustive list here. Instead, I’ve come up with eight examples that are illustrative of the sort of use cases you can satisfy.
I’ll summarise and link to my different examples here, so you can jump straight to the one that sounds the closest to your use case:
- "I need Connectors to put data on my Kafka topics in XML format"
- "I have XML data in external systems that I need to bring into Kafka in a Kafka-friendly format"
- "I have Kafka messages in a Kafka-friendly format that I need to send to external systems as XML"
Background
I’ll start with a recap of the components that make up a Kafka Connect pipeline. This will make it easier for me to refer to different pieces as I go through the examples. If you already know how Connect works, you can skip this bit.
Source pipeline
This is what a Connect source pipeline does:
- The
Connector
connects to some external system, retrieves some data, and passes it on.
Ideally, this is in a structured form: such as Connect’s native structured form (Struct
), or a Java structure (such as a Map). - Optionally, a series of
Transformation
plugins perform operations, such as filters and transforms, on the Connect objects.
Transformations that operate on Connect structs are de-coupled from Connectors because a Connect Struct is a generic structure, agnostic of any individual Connector. - A
Converter
converts the Connect objects to a serialized format for producing to Kafka topics.
For example, this could be Avro, JSON, Protobuf, etc.
Sink pipeline
This is what a Connect sink pipeline does:
- The bytes of a Kafka message are given to a
Converter
to be deserialized into Connect Structs. - Optionally, a series of
Transformation
plugins perform operations, such as filters and transforms, on the Connect objects.
Transformations that operate on Connect structs are de-coupled from Converters because a Connect Struct is a generic structure, agnostic of any individual Converter. - A
Connector
submits the Connect structs to some external system.
Examples
The following examples illustrate the different places to process XML data in a Kafka Connect pipeline. I’ll link to more detailed configuration, and include the XML-specific bits in the post here.
"I need Connectors to put data on my Kafka topics in XML format"
Example: Produce events from your source connectors in XML format
I’ll start with DatagenSourceConnector
– a datagen connector I wrote for the Event Processing tutorials.
I’ve been using it with a JSONConverter
so far:
DatagenSourceConnector
Generates events about a fictional clothes retailerJsonConverter
Serializes the clothing retail events into JSON strings
My full config for how I’ve been running it is here but the interesting bit looks like:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Using a JsonConverter
like this produces events such as:
{
"id": "68579620-fccc-4acb-bfcc-3686919346aa",
"customer": "Taren Reichert",
"customerid": "ef8db9c1-c685-4600-be30-bbd1fa2f7efd",
"description": "XS Retro Boyfriend Jeans",
"price": 16.41,
"quantity": 6,
"region": "APAC",
"ordertime": "2023-10-29 13:20:17.389"
}
Replacing JSONConverter
with the XMLConverter
let me produce these events in XML instead.
DatagenSourceConnector
Generates events about a fictional clothes retailerXmlConverter
Serializes the clothing retailer events into XML strings
The full config for doing this is here but the relevant bit is:
value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=false
Using an XmlConverter
, with the same connector, produces events like:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<order>
<id>68579620-fccc-4acb-bfcc-3686919346aa</id>
<customer>Taren Reichert</customer>
<customerid>ef8db9c1-c685-4600-be30-bbd1fa2f7efd</customerid>
<description>XS Retro Boyfriend Jeans</description>
<price>16.41</price>
<quantity>6</quantity>
<region>APAC</region>
<ordertime>2023-10-29 13:20:17.389</ordertime>
</order>
Here is another example of using XMLConverter
in a source pipeline.
StockPriceSourceConnector
emits stock price updates. When I’ve used it with a JsonConverter
in the past, it produced events like this:
{
"open" : 1.23,
"high" : 1.23,
"low" : 1.22,
"close" : 1.23,
"timestamp" : 1655757900,
"datetime" : "2022-06-20 20:45:00"
}
Replacing JSONConverter
with XMLConverter
lets me produce these stock price events in XML instead.
StockPriceSourceConnector
Emits stock price update eventsXmlConverter
Serializes the stock price events into XML strings
The interesting bit of the config for this is:
value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=false
That produces events like:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<stockdata>
<open>160.65</open>
<high>160.75</high>
<low>160.65</low>
<close>160.75</close>
<volume>10</volume>
<timestamp>1701478140</timestamp>
<datetime>2023-12-01 19:49:00</datetime>
</stockdata>
Produce events from your source connectors in XML, and generate an XSD schema to go with them
When Connect records include information about the structure of the events, a Converter can also be configured to output these as schemas. The type of schema will vary depending on the type of Converter.
Returning to DatagenSourceConnector
, and again using JSONConverter
as a comparison:
DatagenSourceConnector
Generates events about a fictional clothes retailerJsonConverter
Serializes the clothing retailer events into JSON strings, with an embedded JSON Schema
The relevant part of the config for this looks like:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
Setting schemas.enable
to true with a JsonConverter
produces events like:
{
"schema": {
"type": "struct",
"fields": [
{ "type": "string", "optional": false, "field": "id" },
{ "type": "string", "optional": false, "field": "customer" },
{ "type": "string", "optional": false, "field": "customerid" },
{ "type": "string", "optional": false, "field": "description" },
{ "type": "double", "optional": false, "field": "price" },
{ "type": "int32", "optional": false, "field": "quantity" },
{ "type": "string", "optional": false, "field": "region" },
{ "type": "string", "optional": false, "field": "ordertime" }
],
"optional": false,
"name": "order"
},
"payload": {
"id": "55031d01-1bea-4f3b-9b29-12a968d8230e",
"customer": "Gabriel Stracke",
"customerid": "7a75009c-5781-4074-9bf0-20dee537dc9d",
"description": "XXL Stonewashed Capri Jeans",
"price": 28.71,
"quantity": 3,
"region": "SA",
"ordertime": "2023-10-29 13:13:20.675"
}
}
Replacing JSONConverter
with the XMLConverter
let me produce these events in XML instead.
DatagenSourceConnector
Generates events about a fictional clothes retailerXmlConverter
Serializes the clothing retailer events into XML strings with an embedded XSD schema.
The XML-specific bit of the config for this looks like:
value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=true
Setting schemas.enable
to true with a XmlConverter
produces events like:
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="#connectSchema">
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" id="connectSchema">
<xs:element name="order">
<xs:complexType>
<xs:sequence>
<xs:any maxOccurs="1" minOccurs="0" namespace="http://www.w3.org/2001/XMLSchema" processContents="skip"/>
<xs:element name="id" type="xs:string" />
<xs:element name="customer" type="xs:string" />
<xs:element name="customerid" type="xs:string" />
<xs:element name="description" type="xs:string" />
<xs:element name="price" type="xs:double" />
<xs:element name="quantity" type="xs:integer" />
<xs:element name="region" type="xs:string" />
<xs:element name="ordertime" type="xs:string" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
<id>55031d01-1bea-4f3b-9b29-12a968d8230e</id>
<customer>Gabriel Stracke</customer>
<customerid>7a75009c-5781-4074-9bf0-20dee537dc9d</customerid>
<description>XXL Stonewashed Capri Jeans</description>
<price>28.71</price>
<quantity>3</quantity>
<region>SA</region>
<ordertime>2023-10-29 13:13:20.675</ordertime>
</order>
Returning now to the StockPriceSourceConnector
, the XMLConverter
can also produce an embedded XSD schema for the stock price events as well.
StockPriceSourceConnector
Emits stock price update eventsXmlConverter
Serializes the stock price events into XML strings with an embedded XSD schema.
The config for running this looks like:
value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
value.converter.schemas.enable=true
Setting schemas.enable
to true with a XmlConverter
produces stock price events like:
<?xml version="1.0" encoding="UTF-8"?>
<stockdata xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="#connectSchema">
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" id="connectSchema">
<xs:element name="root">
<xs:complexType>
<xs:sequence>
<xs:any maxOccurs="1" minOccurs="0" namespace="http://www.w3.org/2001/XMLSchema" processContents="skip"/>
<xs:element name="open" type="xs:double" />
<xs:element name="high" type="xs:double" />
<xs:element name="low" type="xs:double" />
<xs:element name="close" type="xs:double" />
<xs:element name="volume" type="xs:integer" />
<xs:element name="timestamp" type="xs:string" />
<xs:element name="datetime" type="xs:string" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
<open>142.3</open>
<high>142.3</high>
<low>142.3</low>
<close>142.3</close>
<volume>2</volume>
<timestamp>1698453420</timestamp>
<datetime>2023-10-27 19:37:00</datetime>
</stockdata>
"I have XML data in external systems that I need to bring into Kafka in a Kafka-friendly† format"
† – By "Kafka-friendly", I mean something like Avro or JSON – although Kafka doesn’t care about what the bytes on a topic represent, there are more Kafka tools and applications that know how to process these formats than XML.
Messages on MQ queues and topics are a good example of this, as these are frequently in XML.
Transfer XML messages from MQ queues into Kafka topics as JSON messages
I can set up a Kafka Connect pipeline to bring XML MQ messages onto my Kafka topics as JSON strings.
- MQ queue with XML messages
MQSourceConnector
paired withXmlMQRecordBuilder
Parses the XML and outputs it as Connect structsJsonConverter
Serializes the MQ messages into JSON strings
The config for this is more complex, but the important bits are:
mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
mq.record.builder.root.element.name=doc
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
With this config, I can PUT an MQ message like this:
<doc>
<something>
<message>Hello World</message>
<counts>
<first>100</first>
<second>200</second>
</counts>
</something>
</doc>
The Connect pipeline will produce this to the Kafka topic as a JSON message like this:
{
"something": {
"message": "Hello World",
"counts": {
"first": 100,
"second": 200
}
}
}
Transfer XML messages from MQ queues into Kafka topics as Avro messages
Alternatively, I can configure a Kafka Connect pipeline to bring XML MQ messages onto my Kafka topics as Avro-encoded messages.
- MQ queue with XML messages
MQSourceConnector
paired withXmlMQRecordBuilder
Parses the XML and outputs it as Connect structsAvroConverter
Serializes the MQ messages into binary-encoded Avro
There are multiple AvroConverters to choose from, so I’ll show a couple of examples.
If I’m using io.apicurio.registry.utils.converter.AvroConverter
, configured to use an Apicurio schema registry, the config looks like:
mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
mq.record.builder.root.element.name=simple
mq.record.builder.xsd.schema.path=/opt/kafka/external-configuration/xml-schemas/simple.xsd
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.schemas.enable=true
If I PUT a message like this on my MQ queue:
<?xml version="1.0" encoding="UTF-8"?>
<simple>
<message>Hello World</message>
<count>123</count>
</simple>
The Connect pipeline will produce this to the Kafka topic as an Avro message like this:
And the AvroConverter will register an Avro schema based on the XSD schema in the schema registry:
Alternatively, if I’m using io.confluent.connect.avro.AvroConverter
, (still configured to use an Apicurio schema registry), the config looks like:
mq.record.builder=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
mq.record.builder.root.element.name=ordermessage
mq.record.builder.xsd.schema.path=/opt/kafka/external-configuration/xml-schemas/mq-messages.xsd
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
If I PUT a message like this on my MQ queue:
<?xml version="1.0" encoding="UTF-8" ?>
<ordermessage>
<customer>
<name>Helen Velazquez</name>
<phone type="landline" number="0911 910 5491"/>
<email>mus.donec.dignissim@yahoo.ca</email>
<address>3249 Hendrerit Av.</address>
<postalZip>F2 1IX</postalZip>
<region>Dunbartonshire</region>
</customer>
<product>
<brand>Acme Inc</brand>
<item>Awesome-ivator</item>
<quantity>1</quantity>
</product>
<product>
<brand>Globex</brand>
<item>Widget</item>
<quantity>2</quantity>
</product>
<order>
<date>2023-11-05 22:11:00</date>
</order>
</ordermessage>
The Connect pipeline produces this to the Kafka topic as an Avro message:
And the AvroConverter will register an Avro schema based on the XSD schema in the schema registry:
Capture XML events from a web service and bring them into Kafka topics as JSON messages
Not all source connectors produce structured records. Some connectors will only produce a string. In such cases, it is necessary to use an XmlTransformation
to create structured Connect records from the string.
My example for this is a source connector that makes HTTP calls to an XML web service which it returns as a string.
CamelWeatherSourceConnector
Polls a web service for a weather report event which is returned as an XML stringXmlTransformation
Parses the XML string and outputs it as a structured object- Additional transformations can be applied, now that the pipeline has structured objects
JsonConverter
Serializes the weather event objects into JSON strings
The config for this needs to include details for the weather web service, but the XML-specific elements are:
transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation
transforms.xmlconvert.converter.type=value
transforms.xmlconvert.root.element.name=current
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
This pipeline means that, even though the source connector returns an unstructured string such as:
<?xml version="1.0" encoding="UTF-8"?>
<current>
<visibility value="10000"></visibility>
<precipitation mode="no"></precipitation>
<city id="2652491" name="Compton">
<coord lon="-1.3989" lat="51.0267"></coord>
<country>GB</country>
<timezone>0</timezone>
<sun
rise="2023-10-29T06:51:47"
set="2023-10-29T16:46:41"></sun>
</city>
<temperature
value="285.6"
min="283.83"
max="286.84"
unit="kelvin"></temperature>
<weather
number="802"
value="scattered clouds"
icon="03d"></weather>
<humidity value="89" unit="%"></humidity>
<pressure value="987" unit="hPa"></pressure>
<feels_like
value="285.22"
unit="kelvin"></feels_like>
<wind>
<speed value="4.12" unit="m/s" name="Gentle Breeze"></speed>
<gusts></gusts>
<direction value="210" code="SSW" name="South-southwest"></direction>
</wind>
<clouds value="40" name="scattered clouds"></clouds>
<lastupdate value="2023-10-29T14:39:13"></lastupdate>
</current>
The messages produced to my Kafka topic look like:
{
"precipitation": { "mode": "no" },
"visibility": { "value": 10000 },
"city": {
"country": "GB",
"coord": { "lon": -1.3989, "lat": 51.0267 },
"timezone": 0,
"name": "Compton",
"id": 2652491,
"sun": {
"set": "2023-10-29T16:46:41",
"rise": "2023-10-29T06:51:47"
}
},
"temperature": {
"unit": "kelvin",
"min": 283.83,
"max": 286.84,
"value": 285.6
},
"weather": {
"number": 802,
"icon": 3,
"value": "scattered clouds"
},
"humidity": {
"unit": "%",
"value": 89
},
"pressure": {
"unit": "hPa",
"value": 987
},
"clouds": {
"name": "scattered clouds",
"value": 40
},
"lastupdate": {
"value": "2023-10-29T14:37:13"
},
"feels_like": {
"unit": "kelvin",
"value": 285.22
},
"wind": {
"gusts": "",
"speed": {
"unit": "m/s",
"name": "Gentle Breeze",
"value": 4.12
},
"direction": {
"code": "SSW",
"name": "South-southwest",
"value": 210
}
}
}
"I have Kafka messages in a Kafka-friendly† format that I need to send to external systems as XML"
† – Again, by "Kafka-friendly", I’m talking about formats like Avro or JSON – although Kafka doesn’t care about what the bytes on a topic represent, there are more Kafka tools and applications that know how to process these formats than XML.
As before, MQ is a good example of a system that is likely to expect XML data.
Transfer JSON messages from your Kafka topics to MQ queues as XML documents
I can set up a Kafka Connect pipeline to send JSON messages on my Kafka topics to an MQ queue as XML messages.
- Kafka topic with JSON messages
JsonConverter
Deserializes the JSON strings into Java objectsMQSinkConnector
paired withXmlConverter
Turns the objects into XML strings- XML messages are PUT to the MQ queue
The relevant section of the config for this is:
mq.message.builder=com.ibm.eventstreams.connect.mqsink.builders.ConverterMessageBuilder
mq.message.builder.value.converter=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlConverter
mq.message.builder.value.converter.root.element.name=msg
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
With a sink pipeline configured in such a way, JSON messages on the Kafka topic like this:
{
"test": {
"numbers": [ 10, 20, 30 ],
"letters": [ "A", "B", "C" ],
"message": "Hello World"
},
"creative": false
}
will be PUT to MQ as XML messages like this:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<msg>
<test>
<numbers>10</numbers>
<numbers>20</numbers>
<numbers>30</numbers>
<message>Hello World</message>
<letters>A</letters>
<letters>B</letters>
<letters>C</letters>
</test>
<creative>false</creative>
</msg>
Submit JSON messages from your Kafka topics to an XML web service using HTTP posts
Finally, some sink connectors can only send strings to an external system. In such cases, an XMLTransformation
is useful for converting structured Connect objects to an XML string as the last transformation in a sink pipeline.
To illustrate this, I have a Kafka topic containing JSON messages that I will submit using HTTP POST’s to an XML web service.
- Kafka topic with JSON messages
JsonConverter
Deserializes the JSON strings into Java objects- Transform the Connect objects to match the request payload expectation of the web service API
XMLTransformation
Converts the modified Connect object into an XML stringHttpSinkConnector
Submits the XML string as the payload in an HTTP request
Most of the config for a sink pipeline like this is in defining the transformations to get the objects into the desired shape. In my example, I’m removing one of the properties, inserting a new one, and renaming one of them.
The config that handles converting the JSON messages into XML is:
transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation
transforms.xmlconvert.converter.type=value
transforms.xmlconvert.root.element.name=request
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
This pipeline will consume JSON messages from a Kafka topic like this:
{
"product": {
"id": "ABCD1234",
"type": "Something impressive"
},
"value": 12.99,
"customer": {
"id": "XXXXYYYY",
"name": "Joe Bloggs"
}
}
And use them to make HTTP POST requests like this:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<request>
<product>
<id>ABCD1234</id>
<type>Something impressive</type>
</product>
<cost>12.99</cost>
<origin>demo</origin>
</request>
Summary
To conclude, the eight examples outlined above show a few examples of the places within a Kafka Connect pipeline where you could use our new XML plugin. I’m sure there are other ways to use it as well, but hopefully this gives you some ideas for where to start.
Tags: apachekafka, ibmeventstreams, kafka