In this post, I want to share an example of handling bespoke structured messages with the Kafka Connect MQ Source Connector.
The MQ Source Connector gets data from MQ messages and produces it as events on Kafka topics. The default record builder makes a copy of the data as-is. For example, this can mean taking a JMS TextMessage from MQ and producing a string to Kafka. Or it can mean taking a JMS BytesMessage from MQ and producing a byte array to Kafka.
In my last post, I showed an example of using the XML record builder, to read XML documents from MQ and turn them into structured Kafka Connect records. From this point, I could choose the format I want the data to be produced to Kafka in (e.g. JSON or Avro) by choosing an appropriate value converter (e.g. org.apache.kafka.connect.json.JsonConverter
or io.apicurio.registry.utils.converter.AvroConverter
).
But what if your MQ messages have a custom structure, but you still want Kafka Connect to be able to parse your messages and output them to Kafka in any format of your choice?
In that case, you need to use a record builder that can correctly parse your MQ messages. In this post, I’ll explain what that means, show you how to create one, and share a sample you can use to get started.
Standard formats
First, I’ll recap the earlier, more common examples, but this time explain what is happening.
Text messages
- JMS TextMessage on the MQ queue
- Use:
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
input: String
output: String - Use:
org.apache.kafka.connect.storage.StringConverter
input: String
output: String - String messages
Bytes messages
- JMS BytesMessage on the MQ queue
- Use:
com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
input: byte[]
output: byte[] - Use:
org.apache.kafka.connect.converters.ByteArrayConverter
input: byte[]
output: byte[] - byte[] messages
Now, I’ll recap the XML examples in my last post in the same way.
XML messages in MQ, to JSON in Kafka
- JMS BytesMessage or TextMessage on the MQ queue
- Use:
com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
input: String
output: Kafka Connect Struct - Use:
org.apache.kafka.connect.json.JsonConverter
input: Kafka Connect Struct
output: JSON - JSON messages
XML messages in MQ, to Avro in Kafka
- JMS BytesMessage or TextMessage on the MQ queue
- Use:
com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlMQRecordBuilder
input: String
output: Kafka Connect Struct - Use:
io.apicurio.registry.utils.converter.AvroConverter
input: Kafka Connect Struct
output: Avro - Avro messages
Custom formats
But what if you have MQ messages in a format not currently supported by an existing record builder?
This could be a custom binary format (e.g. first three bytes represent something, the next six bytes represent something else, etc.) or just a String that is delimited in some predefined way.
In these cases, you can just copy the data as-is to Kafka, such as the TextMessage or BytesMessage examples above. But ignoring the structure means you miss out on the rest of the value that a Kafka Connect pipeline can bring (such as transformations), and you make it more complex for Kafka consumers to have to parse an unusual data format.
A better approach is to implement your own custom record builder:
public class YourCustomRecordBuilder extends BaseRecordBuilder { @Override public SchemaAndValue getValue(JMSContext jmsContext, String kafkaTopicName, boolean isMQMessageJms, Message mqMessage) throws JMSException { // RETURN A KAFKA CONNECT STRUCTURED RECORD WITH A SCHEMA } }
For example, imagine you have MQ messages with transactions made up of semi-colon-delimited data:
Merchant Name;Transaction Value;Transaction timestamp
Your MQ messages could contain values such as:
Fruitful Foods;18.99;28.10.2024-11.03.49 WishWear;38.99;28.10.2024-12.47.19 Generation Wardrobe;41.88;28.10.2024-14.01.13 Veritaste;7.99;28.10.2024-15.52.09
By implementing a custom record builder that can interpret messages like this, you could produce the data to Kafka as JSON:
- JMS BytesMessage or TextMessage on the MQ queue
- Use:
YourCustomRecordBuilder
input: String
output: Kafka Connect Struct - Use:
org.apache.kafka.connect.json.JsonConverter
input: Kafka Connect Struct
output: JSON - JSON messages
Messages in MQ like this:
will appear in Kafka in a JSON form like this:
Alternatively, by selecting a different Converter, you could produce the data to Kafka as Avro:
- JMS BytesMessage or TextMessage on the MQ queue
- Use:
YourCustomRecordBuilder
input: String
output: Kafka Connect Struct - Use:
io.apicurio.registry.utils.converter.AvroConverter
input: Kafka Connect Struct
output: Avro - Avro messages
In this case, messages in MQ like this:
will appear in Kafka as binary-encoded Avro, like this:
with an Avro schema (automatically registered) in the schema registry like this:
Example implementation
For the screenshots above, I created this record builder:
package uk.co.dalelane.demos.ibmmq; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; import com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder; import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException; /** * Demonstration of a custom record builder for use with the * Kafka Connect MQ Source Connector. * * It supports JMS Text Messages being PUT to MQ with a body * that matches the format: * * merchant;transactionvalue;timestamp * * Examples of valid text messages include: * * Fruitful Foods;18.99;28.10.2024-11.03.49 * WishWear;38.99;28.10.2024-12.47.19 * Generation Wardrobe;41.88;28.10.2024-14.01.13 * Veritaste;7.99;28.10.2024-15.52.09 */ public class MyCustomRecordBuilder extends BaseRecordBuilder { /** Schema for the records that will be built. */ private Schema connectSchema; /** Formatter for the timestamp field in the message payload. */ private SimpleDateFormat timestampFormat; public MyCustomRecordBuilder() { // define the schema that will be used by this record builder connectSchema = SchemaBuilder.struct() .name("mymqtransactions") .version(1) .field("merchant", Schema.STRING_SCHEMA) .field("transactionvalue", Schema.FLOAT32_SCHEMA) .field("timestamp", Timestamp.SCHEMA) .build(); // prepare the date string format used by the timestamp field timestampFormat = new SimpleDateFormat("dd.MM.yyyy-HH.mm.ss"); } @Override public SchemaAndValue getValue(JMSContext jmsContext, String kafkaTopicName, boolean isMQMessageJms, Message mqMessage) throws JMSException { String messageString = mqMessage.getBody(String.class); String[] messageStringSegments = messageString.split(";"); if (messageStringSegments.length != 3) { throw new RecordBuilderException("Unsupported message format - must be merchant;value;timestamp"); } try { String merchant = messageStringSegments[0]; Float value = Float.parseFloat(messageStringSegments[1]); Date timestamp = timestampFormat.parse(messageStringSegments[2]); Struct connectValue = new Struct(connectSchema); connectValue.put("merchant", merchant); connectValue.put("transactionvalue", value); connectValue.put("timestamp", timestamp); return new SchemaAndValue(connectSchema, connectValue); } catch (NumberFormatException | ParseException parseException) { throw new RecordBuilderException("Unsupported message format - must be merchant;value;timestamp", parseException); } } }
To use this, I need to set mq.record.builder
to the fully-qualified name of my new class:
uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder
For example, to send the messages to Kafka as JSON, I configured a connector like this:
apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: custom-records-connector namespace: event-automation labels: eventstreams.ibm.com/cluster: kafka-connect-cluster spec: class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector config: # format key.converter: org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable: false mq.record.builder: uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false # source mq.queue: COMMANDS.STREAMQ # target topic: TRANSACTIONS # config mq.channel.name: KAFKA.SVRCONN mq.queue.manager: MYQMGR mq.connection.name.list: queuemanager-ibm-mq(1414) mq.message.body.jms: true
And to send the messages to Kafka as Avro, I configured a connector like this:
apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: custom-records-connector namespace: event-automation labels: eventstreams.ibm.com/cluster: kafka-connect-cluster spec: class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector config: # format key.converter: org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable: false mq.record.builder: uk.co.dalelane.demos.ibmmq.MyCustomRecordBuilder value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.schemas.enable: true value.converter.apicurio.registry.url: 'https://my-kafka-cluster-ibm-es-ac-reg-external-event-automation.apps.dalelane.cp.fyre.ibm.com' value.converter.apicurio.registry.auto-register: true value.converter.apicurio.auth.username: kafka-connect-credentials value.converter.apicurio.auth.password: '${file:/opt/kafka/connect-password/kafka-connect-credentials:password}' value.converter.apicurio.registry.request.ssl.truststore.type: PKCS12 value.converter.apicurio.registry.request.ssl.truststore.location: /opt/kafka/connect-certs/my-kafka-cluster-cluster-ca-cert/ca.p12 value.converter.apicurio.registry.request.ssl.truststore.password: '${file:/opt/kafka/connect-certs/my-kafka-cluster-cluster-ca-cert:ca.password}' # source mq.queue: COMMANDS.STREAMQ # target topic: TRANSACTIONS # config mq.channel.name: KAFKA.SVRCONN mq.queue.manager: MYQMGR mq.connection.name.list: queuemanager-ibm-mq(1414) mq.message.body.jms: true
The flexibility of creating the record builder in Java means that you can handle almost any format of message, as long as you’re able to describe how to parse it in code.
This is particularly useful for the proprietary binary formats that I see used so frequently in MQ solutions, where it is unlikely that an off-the-shelf generic builder will be appropriate.
Tags: apachekafka, ibmeventstreams, kafka