Analysing IBM MQ messages in IBM Event Processing

In this post, I’ll walk through a demo of using IBM Event Processing to create an Apache Flink job that calculates summaries of messages from IBM MQ queues.

This is a high-level overview of the demo:

  • A JMS/Jakarta application puts XML messages onto an MQ queue
  • A JSON version of these messages is copied onto a Kafka topic
  • The messages are processed by a Flink job, which outputs JSON results onto a Kafka topic
  • An XML version of the results are copied onto an MQ queue
  • The results are received by a JMS/Jakarta application

I’ve added instructions for how you can create a demo like this for yourself to my demos repo on Github.

The rest of this post is a walkthrough and explanation of how it all works.


Putting XML messages onto an IBM MQ queue

The Putter JMS/Jakarta application runs in Kubernetes alongside the rest of the demo. The configuration for the application is contained in environment variables defined in the Deployment:

ocp-deploy.yaml#L65-L83

The template for the message payloads that it puts is provided from a ConfigMap:

ocp-deploy.yaml#L9-L31

The app uses javafaker to populate the template with random values:

Putter.java#L56-L69

The result is that the MQ queue gets a message like this, every 30 seconds:

IBMMQ.KAFKA.xml


Copying JSON representations to an Apache Kafka topic

This is done by a kafka-connect-mq-source Connector in conjunction with a kafka-connect-xml-converter running in a Kafka Connect runtime.

The configuration for the Source Connector is:

mq-connectors.yaml#L11-L31

It uses an XSD schema to parse the XML messages that are retrieved from IBM MQ, which is provided in a ConfigMap:

mq-schemas.yaml#L7-L57

Notice that the schema allows for correct conversion of what would otherwise be ambiguous from the XML message payloads by themselves – such as the fact that the products should be an array, and that some of the fields are optional.

The result is that JSON representations of the MQ messages are produced to a Kafka topic.


Processing the messages in Apache Flink

The Kafka topic can be used as the source for a new flow in Event Processing.

There is obviously a huge range of processing that can be performed here, but in the interest of keeping this simple to recreate, I simply summed the number of products sold each hour.


Outputting JSON results to Apache Kafka

A new Kafka topic is used as the output for the Flink job.

These are output as JSON payloads, which can be observed from the Kafka topic.


Copying XML representations to an IBM MQ queue

This is done by a kafka-connect-mq-sink Connector in conjunction with a kafka-connect-xml-converter running in a Kafka Connect runtime.

The configuration for the Sink Connector is:

mq-connectors.yaml#L49-L71

The result is that XML representations of the outputs are put to an MQ queue.


Getting results from the IBM MQ queue

The Getter JMS/Jakarta application runs in Kubernetes alongside the rest of the demo. The configuration for the application is contained in environment variables defined in the Deployment:

ocp-deploy.yaml#L125-L135

The application prints out the body of the messages that it gets.

Getter.java#L19-L22

This can be observed by looking at the log for the pod running the application:


Modifying the demo

Using the IBM MQ web UI

To keep things simple, I didn’t use streaming queues. The downside of this is that you can’t view messages in the IBM MQ web UI. (The Source Connector is getting messages from the IBMMQ.KAFKA queue, and the Getter app is getting messages from the KAFKA.IBMMQ queue, before you can view them.)

If you want to view messages in the IBM MQ web UI, you can modify the config to use streaming queues on both ends. This would let the Source Connector and Getter app get a copy of messages, while leaving the originals to be viewable in the web console.

Alternatively you could pause the Source Connector by adding state : paused to the ibmmq-source-connector spec, and pause the Getter app by setting replicas : 0 in the getter app deployment. This would let you review the XML messages in the IBM MQ web UI, before resuming the apps that are getting them.

Where things run

The scripts and instructions in my demos repo assume that you’re running everything in an OpenShift cluster. This is just to keep the demo automation simple, but it would be perfectly possible to modify this to run in different environments.

For example, IBM Event Streams and IBM Event Processing can run in any Kubernetes environment, not just OpenShift – so you could modify this to run Kafka, the Kafka connectors, and the Flink jobs in a different Kubernetes distribution. And the IBM MQ queue manager doesn’t need to be run within Kubernetes – by extending the configuration of the Connectors and JMS applications to make a connection to a remote queue manager.

Functionally this would still be the same as the demo as shown above.

Data formats

There is no need for the MQ messages to be XML documents or for the Kafka messages to be JSON payloads. I did this to illustrate that it is possible to transform the messages when copying them between Kafka and MQ. The demo could have been simplified by putting the messages to IBM MQ as JSON payloads and removing the need for transformation. Alternatively, the Kafka messages could have been produced as Apache Avro instead of JSON, and then processed in Flink as Avro messages. This would reduce the network and disk usage of the Kafka representations of the order events, but make for less readable screenshots which is why I opted for JSON.

Tags: , ,

Leave a Reply