Creating custom record builders for the Kafka Connect MQ Source Connector

In this post, I want to share an example of handling bespoke structured messages with the Kafka Connect MQ Source Connector.

The MQ Source Connector gets data from MQ messages and produces it as events on Kafka topics. The default record builder makes a copy of the data as-is. For example, this can mean taking a JMS TextMessage from MQ and producing a string to Kafka. Or it can mean taking a JMS BytesMessage from MQ and producing a byte array to Kafka.

In my last post, I showed an example of using the XML record builder, to read XML documents from MQ and turn them into structured Kafka Connect records. From this point, I could choose the format I want the data to be produced to Kafka in (e.g. JSON or Avro) by choosing an appropriate value converter (e.g. org.apache.kafka.connect.json.JsonConverter or io.apicurio.registry.utils.converter.AvroConverter).

But what if your MQ messages have a custom structure, but you still want Kafka Connect to be able to parse your messages and output them to Kafka in any format of your choice?

In that case, you need to use a record builder that can correctly parse your MQ messages. In this post, I’ll explain what that means, show you how to create one, and share a sample you can use to get started.

Standard formats

First, I’ll recap the earlier, more common examples, but this time explain what is happening.

Text messages

  1. JMS TextMessage on the MQ queue
  2. Use: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
    input: String
    output: String
  3. Use: org.apache.kafka.connect.storage.StringConverter
    input: String
    output: String
  4. String messages

Bytes messages

  1. JMS BytesMessage on the MQ queue
  2. Use: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
    input: byte[]
    output: byte[]
  3. Use: org.apache.kafka.connect.converters.ByteArrayConverter
    input: byte[]
    output: byte[]
  4. byte[] messages

Now, I’ll recap the XML examples in my last post in the same way.

XML messages in MQ, to JSON in Kafka

  1. JMS BytesMessage or TextMessage on the MQ queue
  2. Use: com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
    input: String
    output: Kafka Connect Struct
  3. Use: org.apache.kafka.connect.json.JsonConverter
    input: Kafka Connect Struct
    output: JSON
  4. JSON messages

XML messages in MQ, to Avro in Kafka

  1. JMS BytesMessage or TextMessage on the MQ queue
  2. Use: com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
    input: String
    output: Kafka Connect Struct
  3. Use: io.apicurio.registry.utils.converter.AvroConverter
    input: Kafka Connect Struct
    output: Avro
  4. Avro messages

Custom formats

But what if you have MQ messages in a format not currently supported by an existing record builder?

This could be a custom binary format (e.g. first three bytes represent something, the next six bytes represent something else, etc.) or just a String that is delimited in some predefined way.

In these cases, you can just copy the data as-is to Kafka, such as the TextMessage or BytesMessage examples above. But ignoring the structure means you miss out on the rest of the value that a Kafka Connect pipeline can bring (such as transformations), and you make it more complex for Kafka consumers to have to parse an unusual data format.

A better approach is to implement your own custom record builder:

public class YourCustomRecordBuilder extends BaseRecordBuilder {

    @Override
    public SchemaAndValue getValue(JMSContext jmsContext,
                                   String kafkaTopicName,
                                   boolean isMQMessageJms,
                                   Message mqMessage) throws JMSException {
        // RETURN A KAFKA CONNECT STRUCTURED RECORD WITH A SCHEMA
    }
}

For example, imagine you have MQ messages with transactions made up of semi-colon-delimited data:

Merchant Name;Transaction Value;Transaction timestamp

Your MQ messages could contain values such as:

Fruitful Foods;18.99;28.10.2024-11.03.49
WishWear;38.99;28.10.2024-12.47.19
Generation Wardrobe;41.88;28.10.2024-14.01.13
Veritaste;7.99;28.10.2024-15.52.09

By implementing a custom record builder that can interpret messages like this, you could produce the data to Kafka as JSON:

  1. JMS BytesMessage or TextMessage on the MQ queue
  2. Use: YourCustomRecordBuilder
    input: String
    output: Kafka Connect Struct
  3. Use: org.apache.kafka.connect.json.JsonConverter
    input: Kafka Connect Struct
    output: JSON
  4. JSON messages

Messages in MQ like this:

will appear in Kafka in a JSON form like this:

Alternatively, by selecting a different Converter, you could produce the data to Kafka as Avro:

  1. JMS BytesMessage or TextMessage on the MQ queue
  2. Use: YourCustomRecordBuilder
    input: String
    output: Kafka Connect Struct
  3. Use: io.apicurio.registry.utils.converter.AvroConverter
    input: Kafka Connect Struct
    output: Avro
  4. Avro messages

In this case, messages in MQ like this:

will appear in Kafka as binary-encoded Avro, like this:

with an Avro schema (automatically registered) in the schema registry like this:

Example implementation

For the screenshots above, I created this record builder:

package uk.co.dalelane.demos.ibmmq;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;

import com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder;
import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException;

/**
 * Demonstration of a custom record builder for use with the
 *  Kafka Connect MQ Source Connector.
 *
 *  It supports JMS Text Messages being PUT to MQ with a body
 *  that matches the format:
 *
 *   merchant;transactionvalue;timestamp
 *
 *  Examples of valid text messages include:
 *
 *   Fruitful Foods;18.99;28.10.2024-11.03.49
 *   WishWear;38.99;28.10.2024-12.47.19
 *   Generation Wardrobe;41.88;28.10.2024-14.01.13
 *   Veritaste;7.99;28.10.2024-15.52.09
 */
public class MyCustomRecordBuilder extends BaseRecordBuilder {

    /** Schema for the records that will be built. */
    private Schema connectSchema;

    /** Formatter for the timestamp field in the message payload. */
    private SimpleDateFormat timestampFormat;

    public MyCustomRecordBuilder() {
        // define the schema that will be used by this record builder
        connectSchema = SchemaBuilder.struct()
            .name("mymqtransactions")
            .version(1)
                .field("merchant", Schema.STRING_SCHEMA)
                .field("transactionvalue", Schema.FLOAT32_SCHEMA)
                .field("timestamp", Timestamp.SCHEMA)
            .build();

        // prepare the date string format used by the timestamp field
        timestampFormat = new SimpleDateFormat("dd.MM.yyyy-HH.mm.ss");
    }

    @Override
    public SchemaAndValue getValue(JMSContext jmsContext,
                                   String kafkaTopicName,
                                   boolean isMQMessageJms,
                                   Message mqMessage) throws JMSException
    {
        String messageString = mqMessage.getBody(String.class);

        String[] messageStringSegments = messageString.split(";");
        if (messageStringSegments.length != 3) {
            throw new RecordBuilderException("Unsupported message format - must be merchant;value;timestamp");
        }

        try {
            String merchant = messageStringSegments[0];
            Float value = Float.parseFloat(messageStringSegments[1]);
            Date timestamp = timestampFormat.parse(messageStringSegments[2]);

            Struct connectValue = new Struct(connectSchema);
            connectValue.put("merchant", merchant);
            connectValue.put("transactionvalue", value);
            connectValue.put("timestamp", timestamp);

            return new SchemaAndValue(connectSchema, connectValue);
        }
        catch (NumberFormatException | ParseException parseException) {
            throw new RecordBuilderException("Unsupported message format - must be merchant;value;timestamp", parseException);
        }
    }
}

To use this, I need to set mq.record.builder to the fully-qualified name of my new class:
uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder

For example, to send the messages to Kafka as JSON, I configured a connector like this:

apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
  name: custom-records-connector
  namespace: event-automation
  labels:
    eventstreams.ibm.com/cluster: kafka-connect-cluster
spec:
  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
  config:
    # format
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    mq.record.builder: uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    # source
    mq.queue: COMMANDS.STREAMQ
    # target
    topic: TRANSACTIONS
    # config
    mq.channel.name: KAFKA.SVRCONN
    mq.queue.manager: MYQMGR
    mq.connection.name.list: queuemanager-ibm-mq(1414)
    mq.message.body.jms: true

And to send the messages to Kafka as Avro, I configured a connector like this:

apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
  name: custom-records-connector
  namespace: event-automation
  labels:
    eventstreams.ibm.com/cluster: kafka-connect-cluster
spec:
  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
  config:
    # format
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    mq.record.builder: uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder
    value.converter: io.apicurio.registry.utils.converter.AvroConverter
    value.converter.schemas.enable: true
    value.converter.apicurio.registry.url: 'https://my-kafka-cluster-ibm-es-ac-reg-external-event-automation.apps.dalelane.cp.fyre.ibm.com'
    value.converter.apicurio.registry.auto-register: true
    value.converter.apicurio.auth.username: kafka-connect-credentials
    value.converter.apicurio.auth.password: '${file:/opt/kafka/connect-password/kafka-connect-credentials:password}'
    value.converter.apicurio.registry.request.ssl.truststore.type: PKCS12
    value.converter.apicurio.registry.request.ssl.truststore.location: /opt/kafka/connect-certs/my-kafka-cluster-cluster-ca-cert/ca.p12
    value.converter.apicurio.registry.request.ssl.truststore.password: '${file:/opt/kafka/connect-certs/my-kafka-cluster-cluster-ca-cert:ca.password}'
    # source
    mq.queue: COMMANDS.STREAMQ
    # target
    topic: TRANSACTIONS
    # config
    mq.channel.name: KAFKA.SVRCONN
    mq.queue.manager: MYQMGR
    mq.connection.name.list: queuemanager-ibm-mq(1414)
    mq.message.body.jms: true

The flexibility of creating the record builder in Java means that you can handle almost any format of message, as long as you’re able to describe how to parse it in code.

This is particularly useful for the proprietary binary formats that I see used so frequently in MQ solutions, where it is unlikely that an off-the-shelf generic builder will be appropriate.

Tags: , ,

Leave a Reply