{"id":5328,"date":"2024-10-27T13:25:31","date_gmt":"2024-10-27T13:25:31","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5328"},"modified":"2024-10-27T13:28:01","modified_gmt":"2024-10-27T13:28:01","slug":"analysing-ibm-mq-messages-in-ibm-event-processing","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5328","title":{"rendered":"Analysing IBM MQ messages in IBM Event Processing"},"content":{"rendered":"<p><strong>In this post, I&#8217;ll walk through a demo of using IBM Event Processing to create an Apache Flink job that calculates summaries of messages from IBM MQ queues.<\/strong><\/p>\n<p>This is a high-level overview of the demo:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo.png?raw=true\"\/><\/p>\n<ul>\n<li>A <strong>JMS\/Jakarta<\/strong> application puts <strong>XML<\/strong> messages onto an <strong>MQ<\/strong> queue<\/li>\n<li>A <strong>JSON<\/strong> version of these messages is copied onto a <strong>Kafka<\/strong> topic<\/li>\n<li>The messages are processed by a <strong>Flink<\/strong> job, which outputs <strong>JSON<\/strong> results onto a <strong>Kafka<\/strong> topic<\/li>\n<li>An <strong>XML<\/strong> version of the results are copied onto an <strong>MQ<\/strong> queue<\/li>\n<li>The results are received by a <strong>JMS\/Jakarta<\/strong> application<\/li>\n<\/ul>\n<p>I&#8217;ve added <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/README.md#ibm-mq-with-xml-messages\">instructions for how you can create a demo like this for yourself<\/a> to my <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\">demos repo<\/a> on Github.<\/p>\n<p>The rest of this post is a walkthrough and explanation of how it all works.<\/p>\n<p><!--more--><\/p>\n<hr \/>\n<h3>Putting XML messages onto an IBM MQ queue<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo-1.png?raw=true\"\/><\/p>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/apps\/ibmmq\/src\/main\/java\/uk\/co\/dalelane\/demos\/ibmmq\/Putter.java\">Putter JMS\/Jakarta application<\/a> runs in Kubernetes alongside the rest of the demo. The configuration for the application is contained in environment variables defined in the Deployment:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2Fa871c6598abbcd87f9c7ec1ccb7573a67057c04d%2Fapps%2Fibmmq%2Focp-deploy.yaml%23L65-L83&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/a871c6598abbcd87f9c7ec1ccb7573a67057c04d\/apps\/ibmmq\/ocp-deploy.yaml#L65-L83\">ocp-deploy.yaml#L65-L83<\/a><\/small><\/p>\n<p>The template for the message payloads that it puts is provided from a ConfigMap:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2Fa871c6598abbcd87f9c7ec1ccb7573a67057c04d%2Fapps%2Fibmmq%2Focp-deploy.yaml%23L9-L31&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/a871c6598abbcd87f9c7ec1ccb7573a67057c04d\/apps\/ibmmq\/ocp-deploy.yaml#L9-L31\">ocp-deploy.yaml#L9-L31<\/a><\/small><\/p>\n<p>The app uses <a href=\"https:\/\/github.com\/DiUS\/java-faker\">javafaker<\/a> to populate the template with random values:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2Fmaster%2Fapps%2Fibmmq%2Fsrc%2Fmain%2Fjava%2Fuk%2Fco%2Fdalelane%2Fdemos%2Fibmmq%2FPutter.java%23L56-L69&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/apps\/ibmmq\/src\/main\/java\/uk\/co\/dalelane\/demos\/ibmmq\/Putter.java#L56-L69\">Putter.java#L56-L69<\/a><\/small><\/p>\n<p>The result is that the MQ queue gets a message like this, every 30 seconds:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2F2f32d34b92a50b84aea1b312469349d80ddd1aaa%2Fsample-messages%2FIBMMQ.KAFKA.xml&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/2f32d34b92a50b84aea1b312469349d80ddd1aaa\/sample-messages\/IBMMQ.KAFKA.xml\">IBMMQ.KAFKA.xml<\/a><\/small><\/p>\n<hr \/>\n<h3>Copying JSON representations to an Apache Kafka topic<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo-2.png?raw=true\"\/><\/p>\n<p>This is done by a <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-source\">kafka-connect-mq-source Connector<\/a> in conjunction with a <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\">kafka-connect-xml-converter<\/a> running in a <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/kafka-connect.yaml\">Kafka Connect runtime<\/a>.<\/p>\n<p>The configuration for the Source Connector is:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2Fa871c6598abbcd87f9c7ec1ccb7573a67057c04d%2Fmq-connectors.yaml%23L11-L31&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/a871c6598abbcd87f9c7ec1ccb7573a67057c04d\/mq-connectors.yaml#L11-L31\">mq-connectors.yaml#L11-L31<\/a><\/small><\/p>\n<p>It uses an XSD schema to parse the XML messages that are retrieved from IBM MQ, which is provided in a ConfigMap:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2F2f32d34b92a50b84aea1b312469349d80ddd1aaa%2Fmq-schemas.yaml%23L7-L57&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/2f32d34b92a50b84aea1b312469349d80ddd1aaa\/mq-schemas.yaml#L7-L57\">mq-schemas.yaml#L7-L57<\/a><\/small><\/p>\n<p>Notice that the schema allows for correct conversion of what would otherwise be ambiguous from the XML message payloads by themselves &#8211; such as the fact that the products should be an array, and that some of the fields are optional.<\/p>\n<p>The result is that JSON representations of the MQ messages are produced to a Kafka topic.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/screenshots\/ibmmq-demo-2.png?raw=true\"\/><\/p>\n<hr \/>\n<h3>Processing the messages in Apache Flink<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo-3.png?raw=true\"\/><\/p>\n<p>The Kafka topic can be used as the source for a new flow in Event Processing.<\/p>\n<p>There is obviously a huge range of processing that can be performed here, but in the interest of keeping this simple to recreate, I simply summed the number of products sold each hour.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/screenshots\/ibmmq-demo-3.png?raw=true\"\/><\/p>\n<hr \/>\n<h3>Outputting JSON results to Apache Kafka<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo-4.png?raw=true\"\/><\/p>\n<p>A new Kafka topic is used as the output for the Flink job.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/screenshots\/ibmmq-demo-4.png?raw=true\"\/><\/p>\n<p>These are output as JSON payloads, which can be observed from the Kafka topic.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/screenshots\/ibmmq-demo-5.png?raw=true\"\/><\/p>\n<hr \/>\n<h3>Copying XML representations to an IBM MQ queue<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo-5.png?raw=true\"\/><\/p>\n<p>This is done by a <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-sink\">kafka-connect-mq-sink Connector<\/a> in conjunction with a <a href=\"https:\/\/github.com\/ibm-messaging\/kafka-connect-xml-converter\">kafka-connect-xml-converter<\/a> running in a <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/kafka-connect.yaml\">Kafka Connect runtime<\/a>.<\/p>\n<p>The configuration for the Sink Connector is:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2F05d58840df06bf032a9f3a6f279aeec951bd4985%2Fmq-connectors.yaml%23L49-L71&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/05d58840df06bf032a9f3a6f279aeec951bd4985\/mq-connectors.yaml#L49-L71\">mq-connectors.yaml#L49-L71<\/a><\/small><\/p>\n<p>The result is that XML representations of the outputs are put to an MQ queue.<\/p>\n<hr \/>\n<h3>Getting results from the IBM MQ queue<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/diagrams\/ibmmq-demo-6.png?raw=true\"\/><\/p>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/apps\/ibmmq\/src\/main\/java\/uk\/co\/dalelane\/demos\/ibmmq\/Getter.java\">Getter JMS\/Jakarta application<\/a> runs in Kubernetes alongside the rest of the demo. The configuration for the application is contained in environment variables defined in the Deployment:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2F05d58840df06bf032a9f3a6f279aeec951bd4985%2Fapps%2Fibmmq%2Focp-deploy.yaml%23L125-L135&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/05d58840df06bf032a9f3a6f279aeec951bd4985\/apps\/ibmmq\/ocp-deploy.yaml#L125-L135\">ocp-deploy.yaml#L125-L135<\/a><\/small><\/p>\n<p>The application prints out the body of the messages that it gets.<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fkafka-demos%2Fblob%2F05d58840df06bf032a9f3a6f279aeec951bd4985%2Fapps%2Fibmmq%2Fsrc%2Fmain%2Fjava%2Fuk%2Fco%2Fdalelane%2Fdemos%2Fibmmq%2FGetter.java%23L19-L22&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;showFileMeta=on&#038;showCopy=on\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/05d58840df06bf032a9f3a6f279aeec951bd4985\/apps\/ibmmq\/src\/main\/java\/uk\/co\/dalelane\/demos\/ibmmq\/Getter.java#L19-L22\">Getter.java#L19-L22<\/a><\/small><\/p>\n<p>This can be observed by looking at the log for the pod running the application:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/screenshots\/ibmmq-demo-6.png?raw=true\"\/><\/p>\n<hr \/>\n<h3>Modifying the demo<\/h3>\n<h4>Using the IBM MQ web UI<\/h4>\n<p>To keep things simple, I didn&#8217;t use streaming queues. The downside of this is that you can&#8217;t view messages in the IBM MQ web UI. (The Source Connector is getting messages from the IBMMQ.KAFKA queue, and the Getter app is getting messages from the KAFKA.IBMMQ queue, before you can view them.)<\/p>\n<p>If you want to view messages in the IBM MQ web UI, you can modify the <a href=\"https:\/\/github.com\/IBM\/event-automation-demo\/blob\/1321df6730d58a9c864d50eb62fe5bb07bff60e8\/install\/supporting-demo-resources\/mq\/templates\/03-qmgr-setup.yaml#L10-L12\">config to use streaming queues<\/a> on both ends. This would let the Source Connector and Getter app get a copy of messages, while leaving the originals to be viewable in the web console.<\/p>\n<p>Alternatively you could pause the Source Connector by adding <code>state : paused<\/code> to the <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/05d58840df06bf032a9f3a6f279aeec951bd4985\/mq-connectors.yaml#L11\">ibmmq-source-connector spec<\/a>, and pause the Getter app by setting <code>replicas : 0<\/code> in the <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/05d58840df06bf032a9f3a6f279aeec951bd4985\/apps\/ibmmq\/ocp-deploy.yaml#L105\">getter app deployment<\/a>. This would let you review the XML messages in the IBM MQ web UI, before resuming the apps that are getting them.<\/p>\n<h4>Where things run<\/h4>\n<p>The scripts and instructions in <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\">my demos repo<\/a> assume that you&#8217;re running everything in an OpenShift cluster. This is just to keep the demo automation simple, but it would be perfectly possible to modify this to run in different environments.<\/p>\n<p>For example, IBM Event Streams and IBM Event Processing can run in any Kubernetes environment, not just OpenShift &#8211; so you could modify this to run Kafka, the Kafka connectors, and the Flink jobs in a different Kubernetes distribution. And the IBM MQ queue manager doesn&#8217;t need to be run within Kubernetes &#8211; by extending the configuration of the Connectors and JMS applications to make a connection to a remote queue manager.<\/p>\n<p>Functionally this would still be the same as the demo as shown above.<\/p>\n<h4>Data formats<\/h4>\n<p>There is no need for the MQ messages to be XML documents or for the Kafka messages to be JSON payloads. I did this to illustrate that it is possible to transform the messages when copying them between Kafka and MQ. The demo could have been simplified by putting the messages to IBM MQ as JSON payloads and removing the need for transformation. Alternatively, the Kafka messages could have been produced as Apache Avro instead of JSON, and then processed in Flink as Avro messages. This would reduce the network and disk usage of the Kafka representations of the order events, but make for less readable screenshots which is why I opted for JSON.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>How to use IBM Event Processing to create an Apache Flink job to calculate summaries of messages on IBM MQ queues<\/p>\n","protected":false},"author":1,"featured_media":5331,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,583,584],"class_list":["post-5328","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-ibmeventstreams","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5328","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5328"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5328\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5331"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5328"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5328"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5328"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}