{"id":5337,"date":"2024-10-28T20:03:23","date_gmt":"2024-10-28T20:03:23","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5337"},"modified":"2026-04-02T17:10:57","modified_gmt":"2026-04-02T17:10:57","slug":"creating-custom-record-builders-for-the-kafka-connect-mq-source-connector","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5337","title":{"rendered":"Creating custom record builders for the Kafka Connect MQ Source Connector"},"content":{"rendered":"<p><strong>In this post, I want to share an example of handling bespoke structured messages with the Kafka Connect MQ Source Connector.<\/strong><\/p>\n<p>The MQ Source Connector gets data from MQ messages and produces it as events on Kafka topics. The <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-source\/blob\/main\/src\/main\/java\/com\/ibm\/eventstreams\/connect\/mqsource\/builders\/DefaultRecordBuilder.java\">default record builder<\/a> makes a copy of the data as-is. For example, this can mean taking a JMS TextMessage from MQ and producing a string to Kafka. Or it can mean taking a JMS BytesMessage from MQ and producing a byte array to Kafka.<\/p>\n<p>In my last post, I showed <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5328\">an example of using the XML record builder<\/a>, to read XML documents from MQ and turn them into structured Kafka Connect records. From this point, I could choose the format I want the data to be produced to Kafka in (e.g. JSON or Avro) by choosing an appropriate value converter (e.g. <code style=\"color: #770000; font-weight: bold;\">org.apache.kafka.connect.json.JsonConverter<\/code> or <code style=\"color: #770000; font-weight: bold;\">io.apicurio.registry.utils.converter.AvroConverter<\/code>).<\/p>\n<p>But what if your MQ messages have a custom structure, but you still want Kafka Connect to be able to parse your messages and output them to Kafka in any format of your choice?<\/p>\n<p>In that case, you need to use a record builder that can correctly parse your MQ messages. In this post, I&#8217;ll explain what that means, show you how to create one, and share a sample you can use to get started.<\/p>\n<p><!--more--><\/p>\n<h2>Standard formats<\/h2>\n<p>First, I&#8217;ll recap the earlier, more common examples, but this time explain what is happening.<\/p>\n<h3>Text messages<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/record-builder.png\" style=\"width: 100%; max-width: 450px;\"\/><\/p>\n<ol>\n<li>JMS TextMessage on the MQ queue<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder<\/code><br \/>\ninput: String<br \/>\noutput: String<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">org.apache.kafka.connect.storage.StringConverter<\/code><br \/>\ninput: String<br \/>\noutput: String<\/li>\n<li>String messages<\/li>\n<\/ol>\n<h3>Bytes messages<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/record-builder.png\" style=\"width: 100%; max-width: 450px;\"\/><\/p>\n<ol>\n<li>JMS BytesMessage on the MQ queue<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder<\/code><br \/>\ninput: byte[]<br \/>\noutput: byte[]<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">org.apache.kafka.connect.converters.ByteArrayConverter<\/code><br \/>\ninput: byte[]<br \/>\noutput: byte[]<\/li>\n<li>byte[] messages<\/li>\n<\/ol>\n<p>Now, I&#8217;ll recap the <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5328\">XML examples in my last post<\/a> in the same way.<\/p>\n<h3>XML messages in MQ, to JSON in Kafka<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/record-builder.png\" style=\"width: 100%; max-width: 450px;\"\/><\/p>\n<ol>\n<li>JMS BytesMessage or TextMessage on the MQ queue<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder<\/code><br \/>\ninput: String<br \/>\noutput: Kafka Connect Struct<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">org.apache.kafka.connect.json.JsonConverter<\/code><br \/>\ninput: Kafka Connect Struct<br \/>\noutput: JSON<\/li>\n<li>JSON messages<\/li>\n<\/ol>\n<h3>XML messages in MQ, to Avro in Kafka<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/record-builder.png\" style=\"width: 100%; max-width: 450px;\"\/><\/p>\n<ol>\n<li>JMS BytesMessage or TextMessage on the MQ queue<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder<\/code><br \/>\ninput: String<br \/>\noutput: Kafka Connect Struct<\/li>\n<li>Use: <code style=\"font-size: 0.9em; color: #770000; font-weight: bold;\">io.apicurio.registry.utils.converter.AvroConverter<\/code><br \/>\ninput: Kafka Connect Struct<br \/>\noutput: Avro<\/li>\n<li>Avro messages<\/li>\n<\/ol>\n<h2>Custom formats<\/h2>\n<p>But what if you have MQ messages in a format not currently supported by an existing record builder?<\/p>\n<p>This could be a custom binary format (e.g. first three bytes represent something, the next six bytes represent something else, etc.) or just a String that is delimited in some predefined way.<\/p>\n<p>In these cases, you can just copy the data as-is to Kafka, such as the TextMessage or BytesMessage examples above. But ignoring the structure means you miss out on the rest of the value that a Kafka Connect pipeline can bring (such as transformations), and you make it more complex for Kafka consumers to have to parse an unusual data format.<\/p>\n<p>A better approach is to implement your own custom record builder:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">public class YourCustomRecordBuilder extends BaseRecordBuilder {\n\n    @Override\n    public SchemaAndValue getValue(JMSContext jmsContext,\n                                   String kafkaTopicName,\n                                   boolean isMQMessageJms,\n                                   Message mqMessage) throws JMSException {\n        \/\/ RETURN A KAFKA CONNECT STRUCTURED RECORD WITH A SCHEMA\n    }\n}<\/pre>\n<p>For example, imagine you have MQ messages with transactions made up of semi-colon-delimited data:<\/p>\n<pre style=\"border: thin black solid; color: black; background-color: #f0f0f0; padding: 1em; overflow-y: scroll; overflow-x: scroll; white-space: pre\">Merchant Name;Transaction Value;Transaction timestamp<\/pre>\n<p>Your MQ messages could contain values such as:<\/p>\n<pre style=\"border: thin black solid; color: black; background-color: #f0f0f0; padding: 1em; overflow-y: scroll; overflow-x: scroll; white-space: pre\">Fruitful Foods;18.99;28.10.2024-11.03.49\nWishWear;38.99;28.10.2024-12.47.19\nGeneration Wardrobe;41.88;28.10.2024-14.01.13\nVeritaste;7.99;28.10.2024-15.52.09<\/pre>\n<p>By implementing a custom record builder that can interpret messages like this, you could produce the data to Kafka as JSON:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/record-builder.png\" style=\"width: 100%; max-width: 450px;\"\/><\/p>\n<ol>\n<li>JMS BytesMessage or TextMessage on the MQ queue<\/li>\n<li>Use: <code style=\"color: #770000; font-weight: bold;\">YourCustomRecordBuilder<\/code><br \/>\ninput: String<br \/>\noutput: Kafka Connect Struct<\/li>\n<li>Use: <code style=\"color: #770000; font-weight: bold;\">org.apache.kafka.connect.json.JsonConverter<\/code><br \/>\ninput: Kafka Connect Struct<br \/>\noutput: JSON<\/li>\n<li>JSON messages<\/li>\n<\/ol>\n<p>Messages in MQ like this:<br \/>\n<img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/mq-messages-1.png\" style=\"width: 100%; max-width: 650px; border: thin black solid;\"\/><\/p>\n<p>will appear in Kafka in a JSON form like this:<br \/>\n<img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/kafka-messages-1.png\" style=\"width: 100%; max-width: 650px; border: thin black solid;\"\/><\/p>\n<p>Alternatively, by selecting a different Converter, you could produce the data to Kafka as Avro:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/record-builder.png\" style=\"width: 100%; max-width: 450px;\"\/><\/p>\n<ol>\n<li>JMS BytesMessage or TextMessage on the MQ queue<\/li>\n<li>Use: <code style=\"color: #770000; font-weight: bold;\">YourCustomRecordBuilder<\/code><br \/>\ninput: String<br \/>\noutput: Kafka Connect Struct<\/li>\n<li>Use: <code style=\"color: #770000; font-weight: bold;\">io.apicurio.registry.utils.converter.AvroConverter<\/code><br \/>\ninput: Kafka Connect Struct<br \/>\noutput: Avro<\/li>\n<li>Avro messages<\/li>\n<\/ol>\n<p>In this case, messages in MQ like this:<br \/>\n<img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/mq-messages-2.png\" style=\"width: 100%; max-width: 650px; border: thin black solid;\"\/><\/p>\n<p>will appear in Kafka as binary-encoded Avro, like this:<br \/>\n<img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/kafka-messages-2.png\" style=\"width: 100%; max-width: 650px; border: thin black solid;\"\/><\/p>\n<p>with an Avro schema (automatically registered) in the schema registry like this:<br \/>\n<img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-10-28-mq-builders\/kafka-messages-2-schema.png\" style=\"width: 100%; max-width: 650px; border: thin black solid;\"\/><\/p>\n<h2>Example implementation<\/h2>\n<p>For the screenshots above, I created this record builder:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">package uk.co.dalelane.demos.ibmmq;\n\nimport java.text.ParseException;\nimport java.text.SimpleDateFormat;\nimport java.util.Date;\n\nimport javax.jms.JMSContext;\nimport javax.jms.JMSException;\nimport javax.jms.Message;\n\nimport org.apache.kafka.connect.data.Schema;\nimport org.apache.kafka.connect.data.SchemaAndValue;\nimport org.apache.kafka.connect.data.SchemaBuilder;\nimport org.apache.kafka.connect.data.Struct;\nimport org.apache.kafka.connect.data.Timestamp;\n\nimport com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder;\nimport com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException;\n\n\/**\n * Demonstration of a custom record builder for use with the\n *  Kafka Connect MQ Source Connector.\n *\n *  It supports JMS Text Messages being PUT to MQ with a body\n *  that matches the format:\n *\n *   merchant;transactionvalue;timestamp\n *\n *  Examples of valid text messages include:\n *\n *   Fruitful Foods;18.99;28.10.2024-11.03.49\n *   WishWear;38.99;28.10.2024-12.47.19\n *   Generation Wardrobe;41.88;28.10.2024-14.01.13\n *   Veritaste;7.99;28.10.2024-15.52.09\n *\/\npublic class MyCustomRecordBuilder extends BaseRecordBuilder {\n\n    \/** Schema for the records that will be built. *\/\n    private Schema connectSchema;\n\n    \/** Formatter for the timestamp field in the message payload. *\/\n    private SimpleDateFormat timestampFormat;\n\n    public MyCustomRecordBuilder() {\n        \/\/ define the schema that will be used by this record builder\n        connectSchema = SchemaBuilder.struct()\n            .name(\"mymqtransactions\")\n            .version(1)\n                .field(\"merchant\", Schema.STRING_SCHEMA)\n                .field(\"transactionvalue\", Schema.FLOAT32_SCHEMA)\n                .field(\"timestamp\", Timestamp.SCHEMA)\n            .build();\n\n        \/\/ prepare the date string format used by the timestamp field\n        timestampFormat = new SimpleDateFormat(\"dd.MM.yyyy-HH.mm.ss\");\n    }\n\n    @Override\n    public SchemaAndValue getValue(JMSContext jmsContext,\n                                   String kafkaTopicName,\n                                   boolean isMQMessageJms,\n                                   Message mqMessage) throws JMSException\n    {\n        String messageString = mqMessage.getBody(String.class);\n\n        String[] messageStringSegments = messageString.split(\";\");\n        if (messageStringSegments.length != 3) {\n            throw new RecordBuilderException(\"Unsupported message format - must be merchant;value;timestamp\");\n        }\n\n        try {\n            String merchant = messageStringSegments[0];\n            Float value = Float.parseFloat(messageStringSegments[1]);\n            Date timestamp = timestampFormat.parse(messageStringSegments[2]);\n\n            Struct connectValue = new Struct(connectSchema);\n            connectValue.put(\"merchant\", merchant);\n            connectValue.put(\"transactionvalue\", value);\n            connectValue.put(\"timestamp\", timestamp);\n\n            return new SchemaAndValue(connectSchema, connectValue);\n        }\n        catch (NumberFormatException | ParseException parseException) {\n            throw new RecordBuilderException(\"Unsupported message format - must be merchant;value;timestamp\", parseException);\n        }\n    }\n}\n<\/pre>\n<p>To use this, I need to set <code style=\"color: #770000; font-weight: bold;\">mq.record.builder<\/code> to the fully-qualified name of my new class:<br \/>\n<code style=\"color: #770000; font-weight: bold;\">uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder<\/code><\/p>\n<p>For example, to send the messages to Kafka as JSON, I configured a connector like this:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 250px;\">apiVersion: eventstreams.ibm.com\/v1beta2\nkind: KafkaConnector\nmetadata:\n  name: custom-records-connector\n  namespace: event-automation\n  labels:\n    eventstreams.ibm.com\/cluster: kafka-connect-cluster\nspec:\n  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector\n  config:\n    # format\n    key.converter: org.apache.kafka.connect.storage.StringConverter\n    key.converter.schemas.enable: false\n    mq.record.builder: uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder\n    value.converter: org.apache.kafka.connect.json.JsonConverter\n    value.converter.schemas.enable: false\n    # source\n    mq.queue: COMMANDS.STREAMQ\n    # target\n    topic: TRANSACTIONS\n    # config\n    mq.channel.name: KAFKA.SVRCONN\n    mq.queue.manager: MYQMGR\n    mq.connection.name.list: queuemanager-ibm-mq(1414)\n    mq.message.body.jms: true<\/pre>\n<p>And to send the messages to Kafka as Avro, I configured a connector like this:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 270px;\">apiVersion: eventstreams.ibm.com\/v1beta2\nkind: KafkaConnector\nmetadata:\n  name: custom-records-connector\n  namespace: event-automation\n  labels:\n    eventstreams.ibm.com\/cluster: kafka-connect-cluster\nspec:\n  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector\n  config:\n    # format\n    key.converter: org.apache.kafka.connect.storage.StringConverter\n    key.converter.schemas.enable: false\n    mq.record.builder: uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder\n    value.converter: io.apicurio.registry.utils.converter.AvroConverter\n    value.converter.schemas.enable: true\n    value.converter.apicurio.registry.url: 'https:\/\/my-kafka-cluster-ibm-es-ac-reg-external-event-automation.apps.dalelane.cp.fyre.ibm.com'\n    value.converter.apicurio.registry.auto-register: true\n    value.converter.apicurio.auth.username: kafka-connect-credentials\n    value.converter.apicurio.auth.password: '${file:\/opt\/kafka\/connect-password\/kafka-connect-credentials:password}'\n    value.converter.apicurio.registry.request.ssl.truststore.type: PKCS12\n    value.converter.apicurio.registry.request.ssl.truststore.location: \/opt\/kafka\/connect-certs\/my-kafka-cluster-cluster-ca-cert\/ca.p12\n    value.converter.apicurio.registry.request.ssl.truststore.password: '${file:\/opt\/kafka\/connect-certs\/my-kafka-cluster-cluster-ca-cert:ca.password}'\n    # source\n    mq.queue: COMMANDS.STREAMQ\n    # target\n    topic: TRANSACTIONS\n    # config\n    mq.channel.name: KAFKA.SVRCONN\n    mq.queue.manager: MYQMGR\n    mq.connection.name.list: queuemanager-ibm-mq(1414)\n    mq.message.body.jms: true<\/pre>\n<p>The flexibility of creating the record builder in Java means that you can handle almost any format of message, as long as you&#8217;re able to describe how to parse it in code.<\/p>\n<p>This is particularly useful for the proprietary binary formats that I see used so frequently in MQ solutions, where it is unlikely that an off-the-shelf generic builder will be appropriate.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this post, I want to share an example of handling bespoke structured messages with the Kafka Connect MQ Source Connector. The MQ Source Connector gets data from MQ messages and produces it as events on Kafka topics. The default record builder makes a copy of the data as-is. For example, this can mean taking [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":5338,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,583,584],"class_list":["post-5337","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-ibmeventstreams","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5337","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5337"}],"version-history":[{"count":2,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5337\/revisions"}],"predecessor-version":[{"id":5939,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5337\/revisions\/5939"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5338"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5337"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5337"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5337"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}