{"id":4615,"date":"2022-07-10T10:51:46","date_gmt":"2022-07-10T10:51:46","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=4615"},"modified":"2022-07-10T10:52:30","modified_gmt":"2022-07-10T10:52:30","slug":"how-to-use-mq-streaming-queues-and-kafka-connect-to-make-an-auditable-copy-of-ibm-mq-messages","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=4615","title":{"rendered":"How to use MQ Streaming Queues and Kafka Connect to make an auditable copy of IBM MQ messages"},"content":{"rendered":"<h2 id=\"summary_scenario\">Scenario<\/h2>\n<p>You have an IBM MQ queue manager. An application is putting messages to a command queue. Another application gets these messages from the queue and takes actions in response.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/1-before.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<h2 id=\"summary_objective\">Objective<\/h2>\n<p>You want multiple separate audit applications to be able to review the commands that go through the command queue.<\/p>\n<p>They should be able to replay a history of these command messages as many times as they want.<\/p>\n<p>This must not impact the application that is currently getting the messages from the queue.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/2-objective.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<h2 id=\"summary_solution\">Solution<\/h2>\n<p>You can use Streaming Queues to make a duplicate of every message put to the command queue to a separate copy queue.<\/p>\n<p>This copy queue can be used to feed a connector that can produce every message to a Kafka topic. This Kafka topic can be used by audit applications<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/3-solution.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<h2 id=\"summary_details\">Details<\/h2>\n<p>The final solution works like this:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/4-details.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<ol type=\"A\">\n<li style=\"list-style-type: upper-alpha;\">A JMS application called <code class=\"mqkafkacode\">Putter<\/code> puts messages onto an IBM MQ queue called <code class=\"mqkafkacode\">COMMANDS<\/code><\/li>\n<li style=\"list-style-type: upper-alpha;\">For the purposes of this demo, a development-instance of LDAP is used to authenticate access to IBM MQ<\/li>\n<li style=\"list-style-type: upper-alpha;\">A JMS application called <code class=\"mqkafkacode\">Getter<\/code> gets messages from the <code class=\"mqkafkacode\">COMMANDS<\/code> queue<\/li>\n<li style=\"list-style-type: upper-alpha;\">Copies of every message put to the COMMANDS queue will be made to the <code class=\"mqkafkacode\">COMMANDS.COPY<\/code> queue<\/li>\n<li style=\"list-style-type: upper-alpha;\">A Connector will get every message from the COMMANDS.COPY queue<\/li>\n<li style=\"list-style-type: upper-alpha;\">The Connector transforms each JMS message into a string, and produces it to the <code class=\"mqkafkacode\">MQ.COMMANDS<\/code> Kafka topic<\/li>\n<li style=\"list-style-type: upper-alpha;\">A Java application called <code class=\"mqkafkacode\">Audit<\/code> can replay the history of all messages on the Kafka topic<\/li>\n<\/ol>\n<p><!--more--><\/p>\n<hr \/>\n<div class=\"mqkafkacontents\">\n<ul>\n<li>Summary\n<ul>\n<li><a href=\"#summary_scenario\">Scenario<\/a><\/li>\n<li><a href=\"#summary_objective\">Objective<\/a><\/li>\n<li><a href=\"#summary_solution\">Solution<\/a><\/li>\n<li><a href=\"#summary_details\">Details<\/a><\/li>\n<\/ul>\n<\/li>\n<li><a href=\"#instructions\">Instructions<\/a>\n<ul>\n<li><a href=\"#instructions_prereqs\">Pre-requisites<\/a><\/li>\n<li>Setup IBM MQ\n<ul>\n<li><a href=\"#instructions_ldap\">Setup LDAP<\/a><\/li>\n<li><a href=\"#instructions_certs\">Generate certificates<\/a><\/li>\n<li><a href=\"#instructions_mqoperator\">Install the MQ Operator<\/a><\/li>\n<li><a href=\"#instructions_queuemanager\">Create the queue manager<\/a><\/li>\n<\/ul>\n<\/li>\n<li>JMS applications\n<ul>\n<li><a href=\"#instructions_jms\">Setup the JMS apps<\/a><\/li>\n<li><a href=\"#instructions_verifyscenario\">Verify the initial scenario<\/a><\/li>\n<\/ul>\n<\/li>\n<li>Setup IBM Event Streams\n<ul>\n<li><a href=\"#instructions_esoperator\">Install the Event Streams Operator<\/a><\/li>\n<li><a href=\"#instructions_kafka\">Create the Kafka cluster<\/a><\/li>\n<li><a href=\"#instructions_kafkaconnect\">Setup the MQ\/Kafka Connector<\/a><\/li>\n<\/ul>\n<\/li>\n<li>Audit applications\n<ul>\n<li><a href=\"#instructions_audit\">Setup the Kafka Java app<\/a><\/li>\n<li><a href=\"#instructions_demo\">Demo the whole solution<\/a><\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<\/div>\n<h1 id=\"instructions\">Instructions<\/h1>\n<h2 class=\"mqkafkasection\" id=\"instructions_prereqs\">Pre-requisites<\/h2>\n<p>If you want to create the entire end-to-end demo solution as described above from scratch, you can simply follow the instructions below.<\/p>\n<p>You will need:<\/p>\n<ul>\n<li>a Red Hat OpenShift cluster<\/li>\n<li>on your local machine:\n<ul>\n<li>the <a href=\"https:\/\/docs.openshift.com\/container-platform\/4.8\/cli_reference\/openshift_cli\/getting-started-cli.html\">OpenShift <code class=\"mqkafkacode\">oc<\/code> CLI<\/a>, logged into your cluster<\/li>\n<li><a href=\"https:\/\/formulae.brew.sh\/formula\/gnu-sed\"><code class=\"mqkafkacode\">gsed<\/code><\/a><\/li>\n<li><code class=\"mqkafkacode\">docker<\/code> CLI<\/li>\n<li><code class=\"mqkafkacode\">Java<\/code> for compiling and running the test applications<\/li>\n<li><code class=\"mqkafkacode\">mvn<\/code> for building the test applications<\/li>\n<li>a copy of the <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\">mq-kafka-connect-tutorial<\/a> repository from Github<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p>The instructions below are broken into sections, so if you instead want to use this tutorial as the basis for adding to an existing scenario (e.g. you already have an IBM MQ queue and want to add Kafka) you can just choose the relative sections to follow.<\/p>\n<h2 class=\"mqkafkasection\">Setup IBM MQ<\/h2>\n<h3 id=\"instructions_ldap\">Setup LDAP<\/h3>\n<p><strong>Install an development LDAP server that will manage user identities for the IBM MQ queue manager.<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/01-install-ldap\/setup.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-ldap.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>Two user identities are defined in <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/01-install-ldap\/resources\/config.yaml\">config.yaml<\/a>:<\/p>\n<p><code class=\"mqkafkacode\">mquser<\/code> that will be used by our JMS MQ applications<\/p>\n<pre class=\"mqkafkalisting\">dn: uid=mquser,ou=people,dc=ibm,dc=com\nobjectClass: inetOrgPerson\nobjectClass: organizationalPerson\nobjectClass: person\nobjectClass: top\ncn: mquserCN\nsn: mquserSN\nuid: mquser\nuserPassword: mquserpassword<\/pre>\n<p><code class=\"mqkafkacode\">kafkauser<\/code> that will be used later by the Kafka Connector<\/p>\n<pre class=\"mqkafkalisting\">dn: uid=kafkauser,ou=people,dc=ibm,dc=com\nobjectClass: inetOrgPerson\nobjectClass: organizationalPerson\nobjectClass: person\nobjectClass: top\ncn: kafkauserCN\nsn: kafkauserSN\nuid: kafkauser\nuserPassword: kafkapassword<\/pre>\n<h3 id=\"instructions_certs\">Generate certificates<\/h3>\n<p><strong>Generate a new self-signed CA, and use it to create certificates for the MQ queue manager, the JMS MQ client apps, and the Kafka Connector.<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/02-generate-certs\/generate.sh<\/pre>\n<p>The JMS client and Kafka Connector certificates are added to the queue manager&#8217;s keystore so that it will accept connections from them.<\/p>\n<p>Ths certificate generation is all done in a Docker container to reduce the number of dependencies you need locally.<\/p>\n<p>Running this will create:<\/p>\n<ul>\n<li><code class=\"mqkafkacode\">streamingdemo-ca<\/code> files &#8211; with the CA used to sign all certs in the tutorial demo<\/li>\n<li><code class=\"mqkafkacode\">streamingdemo-jms-client<\/code> files &#8211; used by the JMS applications<\/li>\n<li><code class=\"mqkafkacode\">streamingdemo-kafka-client<\/code> files &#8211; used by the Kafka Connect connector for connecting to IBM MQ<\/li>\n<li><code class=\"mqkafkacode\">streamingdemo-mq-server<\/code> files &#8211; used by the IBM MQ queue manager<\/li>\n<\/ul>\n<h3 id=\"instructions_mqoperator\">Install the MQ Operator<\/h3>\n<p><strong>Install the MQ Operator that will manage your IBM MQ queue manager.<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/03-install-mq-operator\/setup.sh<\/pre>\n<p>The tutorial script uses a specific channel for the MQ Operator that is supported on OpenShift 4.8 and earlier. You may want to <a href=\"https:\/\/www.ibm.com\/docs\/en\/ibm-mq\/9.2?topic=openshift-installing-mq-operator-using-web-console\">do this step manually<\/a> if you want to choose a different version.<\/p>\n<h3 id=\"instructions_queuemanager\">Create the queue manager<\/h3>\n<p><strong>Create and configure the queue manager to prepare it for using with the JMS applications<\/strong><\/p>\n<pre class=\"mqkafkastep\">export IBM_ENTITLEMENT_KEY=yourentitlementkey\n.\/04-create-queue-manager\/setup.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-mq.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p><strong>BEFORE<\/strong> running this, you will need to set an environment variable with an Entitled Registry key from <a href=\"https:\/\/myibm.ibm.com\/products-services\/containerlibrary\">myibm.ibm.com<\/a><\/p>\n<p>The queue manager config is contained in the <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/04-create-queue-manager\/resources\/configure-qmgr.yaml\">qmgr-setup<\/a> ConfigMap which:<\/p>\n<p>Creates the <code class=\"mqkafkacode\">COMMANDS<\/code> queue that the JMS applications will use<\/p>\n<pre class=\"mqkafkalisting\">DEFINE QLOCAL(COMMANDS) REPLACE<\/pre>\n<p>Creates the <code class=\"mqkafkacode\">APP.SVRCONN<\/code> channel that the JMS applications will use<\/p>\n<pre class=\"mqkafkalisting\">DEFINE CHANNEL(APP.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE<\/pre>\n<p>Sets up security so that the JMS applications have to use the <code class=\"mqkafkacode\">mquser<\/code> username \/ password <a href=\"#instructions_ldap\">defined in LDAP previously<\/a><\/p>\n<pre class=\"mqkafkalisting\">SET CHLAUTH(APP.SVRCONN) TYPE(BLOCKUSER) USERLIST(*MQUSER) WARN(YES) ACTION(REPLACE)\n\nREFRESH SECURITY\n\nSET AUTHREC OBJTYPE(QMGR) GROUP('mqusers') AUTHADD(ALL)\nSET AUTHREC OBJTYPE(QUEUE) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)\nSET AUTHREC OBJTYPE(CHANNEL) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)<\/pre>\n<h2 class=\"mqkafkasection\">JMS applications<\/h2>\n<h3 id=\"instructions_jms\">Setup the JMS apps<\/h3>\n<p><strong>Compile the JMS Putter and Getter applications<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/build-apps.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-jms.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>The config for the apps is contained in <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/05-jms-apps\/mq-jms-simple\/src\/main\/java\/com\/ibm\/clientengineering\/mq\/samples\/Config.java\">Config.java<\/a>.<\/p>\n<p>The <code class=\"mqkafkacode\">HOST<\/code> constant is modified by the <code class=\"mqkafkacode\">build-apps.sh<\/code> script to match the hostname for your queue manager.<\/p>\n<pre class=\"mqkafkalisting\">public static final String HOST = \"PLACEHOLDERHOSTNAME\";\npublic static final String QMGRNAME = \"MYQMGR\";\npublic static final String CHANNEL = \"APP.SVRCONN\";\npublic static final String QUEUE = \"COMMANDS\";\npublic static final String CIPHER = \"ECDHE_RSA_AES_128_CBC_SHA256\";<\/pre>\n<p>The truststore and keystore values set in Java system properties are the files <a href=\"#instructions_certs\">created previously<\/a>.<\/p>\n<pre class=\"mqkafkalisting\">System.setProperty(\"javax.net.ssl.trustStore\", \"..\/02-generate-certs\/certs\/streamingdemo-ca.jks\" );\nSystem.setProperty(\"javax.net.ssl.keyStore\", \"..\/02-generate-certs\/certs\/streamingdemo-jms-client.jks\" );\nSystem.setProperty(\"javax.net.ssl.keyStorePassword\", \"passw0rd\" );\nSystem.setProperty(\"com.ibm.mq.cfg.useIBMCipherMappings\", \"false\");<\/pre>\n<h3 id=\"instructions_verifyscenario\">Verify the initial scenario<\/h3>\n<p><strong>Send messages to the COMMANDS queue<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/run-putter.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-put.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/05-jms-apps\/mq-jms-simple\/src\/main\/java\/com\/ibm\/clientengineering\/mq\/samples\/Putter.java\">Putter<\/a> application puts five messages onto the COMMANDS queue and then terminates.<\/p>\n<pre class=\"mqkafkalisting\">sendMessage(\"Perform the first action A\", session, producer);\nsendMessage(\"Perform a second action B\", session, producer);\nsendMessage(\"Perform the third action C\", session, producer);\nsendMessage(\"Perform some penultimate fourth action D\", session, producer);\nsendMessage(\"Perform a fifth and final action E\", session, producer);<\/pre>\n<p><strong>Verify the messages from the MQ web console<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/access-mq-console.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/mq-console-messages.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>Click through to the <code class=\"mqkafkacode\">COMMANDS<\/code> queue, and you should see the five messages that the <code class=\"mqkafkacode\">Putter<\/code> app put.<\/p>\n<p><strong>Get messages from the COMMANDS queue<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/run-getter.sh<\/pre>\n<p>This will get messages from the <code class=\"mqkafkacode\">COMMANDS<\/code> queue, and write them to the console.<\/p>\n<pre class=\"mqkafkalisting\">----------------------------------------\nGetting messages from the COMMANDS queue\n----------------------------------------\n\n    JMSMessage class: jms_text\n    JMSType:          null\n    JMSDeliveryMode:  2\n    JMSDeliveryDelay: 0\n    JMSDeliveryTime:  0\n    JMSExpiration:    0\n    JMSPriority:      4\n    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96201310040\n    JMSTimestamp:     1657388750326\n    JMSCorrelationID: null\n    JMSDestination:   queue:\/\/\/COMMANDS\n    JMSReplyTo:       null\n    JMSRedelivered:   false\n    JMSXAppID: mq.samples.Putter\n    JMSXDeliveryCount: 1\n    JMSXUserID: mquser\n    JMS_IBM_Character_Set: UTF-8\n    JMS_IBM_Encoding: 273\n    JMS_IBM_Format: MQSTR\n    JMS_IBM_MsgType: 8\n    JMS_IBM_PutApplType: 28\n    JMS_IBM_PutDate: 20220709\n    JMS_IBM_PutTime: 17455036\nPerform the first action A\n\n    JMSMessage class: jms_text\n    JMSType:          null\n    JMSDeliveryMode:  2\n    JMSDeliveryDelay: 0\n    JMSDeliveryTime:  0\n    JMSExpiration:    0\n    JMSPriority:      4\n    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96202310040\n    JMSTimestamp:     1657388750351\n    JMSCorrelationID: null\n    JMSDestination:   queue:\/\/\/COMMANDS\n    JMSReplyTo:       null\n    JMSRedelivered:   false\n    JMSXAppID: mq.samples.Putter\n    JMSXDeliveryCount: 1\n    JMSXUserID: mquser\n    JMS_IBM_Character_Set: UTF-8\n    JMS_IBM_Encoding: 273\n    JMS_IBM_Format: MQSTR\n    JMS_IBM_MsgType: 8\n    JMS_IBM_PutApplType: 28\n    JMS_IBM_PutDate: 20220709\n    JMS_IBM_PutTime: 17455038\nPerform a second action B\n\n    JMSMessage class: jms_text\n    JMSType:          null\n    JMSDeliveryMode:  2\n    JMSDeliveryDelay: 0\n    JMSDeliveryTime:  0\n    JMSExpiration:    0\n    JMSPriority:      4\n    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96203310040\n    JMSTimestamp:     1657388750364\n    JMSCorrelationID: null\n    JMSDestination:   queue:\/\/\/COMMANDS\n    JMSReplyTo:       null\n    JMSRedelivered:   false\n    JMSXAppID: mq.samples.Putter\n    JMSXDeliveryCount: 1\n    JMSXUserID: mquser\n    JMS_IBM_Character_Set: UTF-8\n    JMS_IBM_Encoding: 273\n    JMS_IBM_Format: MQSTR\n    JMS_IBM_MsgType: 8\n    JMS_IBM_PutApplType: 28\n    JMS_IBM_PutDate: 20220709\n    JMS_IBM_PutTime: 17455039\nPerform the third action C\n\n    JMSMessage class: jms_text\n    JMSType:          null\n    JMSDeliveryMode:  2\n    JMSDeliveryDelay: 0\n    JMSDeliveryTime:  0\n    JMSExpiration:    0\n    JMSPriority:      4\n    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96204310040\n    JMSTimestamp:     1657388750380\n    JMSCorrelationID: null\n    JMSDestination:   queue:\/\/\/COMMANDS\n    JMSReplyTo:       null\n    JMSRedelivered:   false\n    JMSXAppID: mq.samples.Putter\n    JMSXDeliveryCount: 1\n    JMSXUserID: mquser\n    JMS_IBM_Character_Set: UTF-8\n    JMS_IBM_Encoding: 273\n    JMS_IBM_Format: MQSTR\n    JMS_IBM_MsgType: 8\n    JMS_IBM_PutApplType: 28\n    JMS_IBM_PutDate: 20220709\n    JMS_IBM_PutTime: 17455040\nPerform some penultimate fourth action D\n\n    JMSMessage class: jms_text\n    JMSType:          null\n    JMSDeliveryMode:  2\n    JMSDeliveryDelay: 0\n    JMSDeliveryTime:  0\n    JMSExpiration:    0\n    JMSPriority:      4\n    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96205310040\n    JMSTimestamp:     1657388750392\n    JMSCorrelationID: null\n    JMSDestination:   queue:\/\/\/COMMANDS\n    JMSReplyTo:       null\n    JMSRedelivered:   false\n    JMSXAppID: mq.samples.Putter\n    JMSXDeliveryCount: 1\n    JMSXUserID: mquser\n    JMS_IBM_Character_Set: UTF-8\n    JMS_IBM_Encoding: 273\n    JMS_IBM_Format: MQSTR\n    JMS_IBM_MsgType: 8\n    JMS_IBM_PutApplType: 28\n    JMS_IBM_PutDate: 20220709\n    JMS_IBM_PutTime: 17455042\nPerform a fifth and final action E<\/pre>\n<p><strong>Verify again from the MQ web console<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/access-mq-console.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/mq-console-emptyqueue.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>You should see that the <code class=\"mqkafkacode\">COMMANDS<\/code> queue is empty, as the <code class=\"mqkafkacode\">Getter<\/code> app has retrieved the messages.<\/p>\n<h2 class=\"mqkafkasection\">Setup IBM Event Streams<\/h2>\n<h3 id=\"instructions_esoperator\">Install the Event Streams Operator<\/h3>\n<p><strong>Install the Event Streams Operator that will manage your Kafka cluster and Connector.<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/06-install-eventstreams-operator\/setup.sh<\/pre>\n<p>The tutorial script uses a specific channel for the Event Streams Operator. You may want to <a href=\"https:\/\/ibm.github.io\/event-streams\/installing\/installing\/#install-the-event-streams-operator\">do this step manually<\/a> if you want to choose a different version.<\/p>\n<h3 id=\"instructions_kafka\">Create the Kafka cluster<\/h3>\n<p><strong>Create and configure the Kafka cluster that will host the audit topic<\/strong><\/p>\n<pre class=\"mqkafkastep\">export IBM_ENTITLEMENT_KEY=yourentitlementkey\n.\/07-create-kafka-cluster\/setup.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-kafka.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p><strong>BEFORE<\/strong> running this, you will need to set an environment variable with an Entitled Registry key from <a href=\"https:\/\/myibm.ibm.com\/products-services\/containerlibrary\">myibm.ibm.com<\/a><\/p>\n<p>The Kafka cluster config is contained in the <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/07-create-kafka-cluster\/resources\/cluster.yaml\">EventStreams<\/a> spec:<\/p>\n<p>Creates an internal bootstrap address that will be used by the Connector<\/p>\n<pre class=\"mqkafkalisting\">- authentication:\n    type: scram-sha-512\n  name: internal\n  port: 9093\n  tls: true\n  type: internal<\/pre>\n<p>Creates an external bootstrap address that will be used by the <code class=\"mqkafkacode\">Audit<\/code> app you run from your local machine.<\/p>\n<pre class=\"mqkafkalisting\">- authentication:\n    type: scram-sha-512\n  name: external\n  port: 9094\n  tls: true\n  type: external<\/pre>\n<h2 class=\"mqkafkasection\">Setup the MQ\/Kafka Connector<\/h2>\n<h3 id=\"instructions_kafkaconnect\">Setup the Kafka Connector<\/h3>\n<p><strong>Start a Kafka Connector that will get a copy of messages from the COMMANDS MQ queue, and produce them to the MQ.COMMANDS Kafka topic<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/08-setup-kafka-connect\/setup.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-streaming.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>This starts by using the <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/resources\/modify-qmgr.yaml\">modify-qmgr<\/a> ConfigMap, adding a third <code class=\"mqkafkacode\">3-modifications.mqsc<\/code> entry to modify the MQ queue manager.<\/p>\n<p>It creates a new <code class=\"mqkafkacode\">COMMANDS.COPY<\/code> queue for the Connector to use, and alters the COMMANDS queue used by the JMS apps, so that the queue manager streams a copy of all messages to it.<\/p>\n<pre class=\"mqkafkalisting\">DEFINE QLOCAL(COMMANDS.COPY) REPLACE\nALTER QLOCAL(COMMANDS) STREAMQ('COMMANDS.COPY') STRMQOS(MUSTDUP)<\/pre>\n<p>It sets up a <code class=\"mqkafkacode\">KAFKA.SVRCONN<\/code> channel for the Connector to use to connect to the queue manager.<\/p>\n<pre class=\"mqkafkalisting\">DEFINE CHANNEL(KAFKA.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE<\/pre>\n<p>It updates the queue manager security so that the Connector can access the queue using the <code class=\"mqkafkacode\">kafkauser<\/code> username \/ password <a href=\"#instructions_ldap\">defined in LDAP previously<\/a><\/p>\n<pre class=\"mqkafkalisting\">SET CHLAUTH(KAFKA.SVRCONN) TYPE(BLOCKUSER) USERLIST(*KAFKAUSER) WARN(YES) ACTION(REPLACE)\n\nREFRESH SECURITY\n\nSET AUTHREC OBJTYPE(QMGR) GROUP('kafkausers') AUTHADD(CONNECT, INQ)\nSET AUTHREC OBJTYPE(QUEUE) PROFILE(COMMANDS.COPY) GROUP('kafkausers') AUTHADD(ALLMQI)<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-kafkatopic.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>It creates <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/resources\/topic.yaml\">a Kafka topic<\/a> that the Connector can produce to. The topic is configured to retain a copy of all commands that have been sent for the last 365 days.<\/p>\n<pre class=\"mqkafkalisting\">apiVersion: eventstreams.ibm.com\/v1beta2\nkind: KafkaTopic\nmetadata:\n  labels:\n    eventstreams.ibm.com\/cluster: eventstreams\n  name: commands.topic\n  namespace: kafka\nspec:\n  config:\n    min.insync.replicas: '1'\n    retention.ms: '31536000000'\n  partitions: 1\n  replicas: 3\n  topicName: MQ.COMMANDS<\/pre>\n<p>It <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/resources\/kafka-creds.yaml\">generates credentials<\/a> that the Connector can use to connect to Kafka. These will be used to produce to the <code class=\"mqkafkacode\">MQ.COMMANDS<\/code> topic, as well as to create and use topics that Kafka Connect uses to store state.<\/p>\n<pre class=\"mqkafkalisting\">apiVersion: eventstreams.ibm.com\/v1beta1\nkind: KafkaUser\nmetadata:\n  name: kafka-connect-credentials\n  namespace: kafka\n  labels:\n    eventstreams.ibm.com\/cluster: eventstreams\nspec:\n  authentication:\n    # generate username\/password for this user\n    type: scram-sha-512\n  authorization:\n    acls:\n      # ---------------------------------------\n      # cluster permissions\n      # ---------------------------------------\n      # check existing cluster config\n      - operation: DescribeConfigs\n        resource:\n          type: cluster\n      # ---------------------------------------\n      # topic permissions\n      # ---------------------------------------\n      # check existing topics\n      - operation: DescribeConfigs\n        resource:\n          name: '*'\n          patternType: literal\n          type: topic\n      # create topics (both to produce to, and to use for internal state)\n      - operation: Create\n        resource:\n          name: '*'\n          patternType: literal\n          type: topic\n      # consume from topics (needed to retrieve state from internal topics)\n      - operation: Read\n        resource:\n          name: '*'\n          patternType: literal\n          type: topic\n      # produce to topics (both writing to internal state topics and messages being produced by connectors)\n      - operation: Write\n        resource:\n          name: '*'\n          patternType: literal\n          type: topic\n      # ---------------------------------------\n      # consumer group permissions\n      # ---------------------------------------\n      - operation: Read\n        resource:\n          name: '*'\n          patternType: literal\n          type: group\n      # ---------------------------------------\n      # transaction permissions\n      # ---------------------------------------\n      # create transactions\n      - operation: Write\n        resource:\n          name: '*'\n          patternType: literal\n          type: transactionalId\n    type: simple<\/pre>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/setup.sh\">setup script<\/a> also creates secrets that store credentials and certificates that the Connector uses to connect to MQ.<\/p>\n<pre class=\"mqkafkalisting\">oc apply -f .\/resources\/mq-creds.yaml\n\noc create secret generic jms-client-truststore \\\n    --from-file=ca.jks=..\/02-generate-certs\/certs\/streamingdemo-ca.jks \\\n    --from-literal=password='passw0rd' \\\n    -n kafka --dry-run=client -oyaml | oc apply -f -\n\noc create secret generic jms-client-keystore \\\n    --from-file=client.jks=..\/02-generate-certs\/certs\/streamingdemo-kafka-client.jks \\\n    --from-literal=password='passw0rd' \\\n    -n kafka --dry-run=client -oyaml | oc apply -f -\n\noc create secret generic mq-ca-tls \\\n    --from-file=ca.crt=..\/02-generate-certs\/certs\/streamingdemo-ca.crt \\\n    -n kafka --dry-run=client -oyaml | oc apply -f -<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-connect.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/setup.sh\">setup script<\/a> builds a simple custom Docker image based on the Event Streams Kafka container, extended with the jar for the MQ source connector.<\/p>\n<pre class=\"mqkafkalisting\">FROM cp.icr.io\/cp\/ibm-eventstreams-kafka:11.0.2\n\nUSER root\nRUN mkdir -p \/opt\/kafka\/plugins\/\nRUN curl -Lo \/opt\/kafka\/plugins\/kafka-connect-mq-source.jar https:\/\/github.com\/ibm-messaging\/kafka-connect-mq-source\/releases\/download\/v1.3.1\/kafka-connect-mq-source-1.3.1-jar-with-dependencies.jar\n\nUSER 1001<\/pre>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/resources\/connect-cluster.yaml\">Connect cluster definition<\/a> specifies the address for the Kafka cluster to connect to, the custom Docker image to use, and provides access to the Kafka and MQ credentials and certificates that the Connector will need.<\/p>\n<pre class=\"mqkafkalisting\">image: image-registry.openshift-image-registry.svc:5000\/kafka\/kafkaconnectwithmq:latest\n\nbootstrapServers: eventstreams-kafka-bootstrap.kafka.svc:9093\n\ntls:\n  trustedCertificates:\n  - secretName: eventstreams-cluster-ca-cert\n    certificate: ca.crt\n  - secretName: mq-ca-tls\n    certificate: ca.crt\nauthentication:\n  type: scram-sha-512\n  username: kafka-connect-credentials\n  passwordSecret:\n    secretName: kafka-connect-credentials\n    password: password\nexternalConfiguration:\n  volumes:\n    - name: mq-credentials\n      secret:\n        secretName: mq-credentials\n    - name: jms-client-truststore\n      secret:\n        secretName: jms-client-truststore\n    - name: jms-client-keystore\n      secret:\n        secretName: jms-client-keystore<\/pre>\n<p>Finally, the <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/setup.sh\">setup script<\/a> starts the MQ\/Kafka connector using the <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/08-setup-kafka-connect\/resources\/connector.yaml\">mq-connector<\/a> spec.<\/p>\n<pre class=\"mqkafkalisting\"># the Kafka topic to produce to\ntopic: MQ.COMMANDS\n\n# the MQ queue to get messages from\nmq.queue: COMMANDS.COPY\n\n# messages sent to the MQ queue are JMS TextMessages\nmq.message.body.jms: true\nkey.converter: org.apache.kafka.connect.storage.StringConverter\nvalue.converter: org.apache.kafka.connect.storage.StringConverter\nmq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder\n\n# connection details for the queue manager\nmq.queue.manager: MYQMGR\nmq.connection.name.list: queuemanager-ibm-mq.ibmmq(1414)\nmq.channel.name: KAFKA.SVRCONN\nmq.user.name: ${file:\/opt\/kafka\/external-configuration\/mq-credentials:username}\nmq.password: ${file:\/opt\/kafka\/external-configuration\/mq-credentials:password}\n\n# SSL config for connecting to MQ\nmq.ssl.use.ibm.cipher.mappings: false\nmq.ssl.cipher.suite: '*TLS12'\nmq.ssl.truststore.location: \/opt\/kafka\/external-configuration\/jms-client-truststore\/ca.jks\nmq.ssl.truststore.password: ${file:\/opt\/kafka\/external-configuration\/jms-client-truststore:password}\nmq.ssl.keystore.location: \/opt\/kafka\/external-configuration\/jms-client-keystore\/client.jks\nmq.ssl.keystore.password: ${file:\/opt\/kafka\/external-configuration\/jms-client-keystore:password}<\/pre>\n<h2 class=\"mqkafkasection\">Audit applications<\/h2>\n<h3 id=\"instructions_audit\">Setup the Kafka Java app<\/h3>\n<p><strong>Compile the Java Kafka application<\/strong><\/p>\n<pre class=\"mqkafkastep\">.\/09-audit-app\/build-apps.sh<\/pre>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/section-audit.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<p>The config for the apps is contained in <a href=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/blob\/master\/09-audit-app\/kafka-simple\/src\/main\/java\/com\/ibm\/clientengineering\/kafka\/samples\/Config.java\">Config.java<\/a>.<\/p>\n<p>The <code class=\"mqkafkacode\">BOOTSTRAP<\/code> constant is modified by the <code class=\"mqkafkacode\">build-apps.sh<\/code> script to match the hostname for your Kafka cluster.<\/p>\n<pre class=\"mqkafkalisting\">public static final String TOPIC = \"MQ.COMMANDS\";\n\npublic static final String BOOTSTRAP = \"PLACEHOLDERBOOTSTRAP\";\npublic static final String CLIENT = \"audit-app\";\npublic static final String GROUP = \"audit\";\n\npublic static final String TRUSTSTORE = \".\/ca.p12\";\npublic static final String TRUSTSTORE_PASSWORD = \"PLACEHOLDERTRUSTSTOREPASSWORD\";\n\npublic static final String USERNAME = \"audit-app\";\npublic static final String PASSWORD = \"PLACEHOLDERAPPPASSWORD\";<\/pre>\n<p>The credentials and Kafka cluster truststore generated by the Event Streams Operator are downloaded by the setup script.<\/p>\n<pre class=\"mqkafkalisting\">BOOTSTRAP=$(oc get eventstreams eventstreams -nkafka -ojsonpath='{.status.kafkaListeners[?(@.type==\"external\")].bootstrapServers}')\ngsed -i -e 's\/PLACEHOLDERBOOTSTRAP\/'$BOOTSTRAP'\/' kafka-simple\/src\/main\/java\/com\/ibm\/clientengineering\/kafka\/samples\/Config.java\n\nCA_PASSWORD=$(oc get secret eventstreams-cluster-ca-cert -nkafka -o 'go-template={{index .data \"ca.password\"}}' | base64 -d)\ngsed -i -e 's\/PLACEHOLDERTRUSTSTOREPASSWORD\/'$CA_PASSWORD'\/' kafka-simple\/src\/main\/java\/com\/ibm\/clientengineering\/kafka\/samples\/Config.java\n\nPASSWORD=$(oc get secret audit-app -nkafka -o jsonpath='{..password}' | base64 -d)\ngsed -i -e 's\/PLACEHOLDERAPPPASSWORD\/'$PASSWORD'\/' kafka-simple\/src\/main\/java\/com\/ibm\/clientengineering\/kafka\/samples\/Config.java<\/pre>\n<p>The <code class=\"mqkafkacode\">Audit<\/code> app avoids committing any offsets to the Kafka cluster, so that it can always replay all events from the Kafka topic, every time the application is run.<\/p>\n<pre class=\"mqkafkalisting\">props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, \"earliest\");\nprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, \"false\");<\/pre>\n<h3 id=\"instructions_demo\">Demo the whole solution<\/h3>\n<p><strong>Use MQ Streaming Queues and Kafka Connect to make an auditable copy of IBM MQ messages<\/strong><\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/mq-kafka-connect-tutorial\/raw\/master\/doc\/3-solution.png\" alt=\"diagram\" class=\"mqkafkadiagram\"\/><\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/run-putter.sh<\/pre>\n<p>Put messages to the <code class=\"mqkafkacode\">COMMANDS<\/code> queue.<\/p>\n<pre class=\"mqkafkastep\">.\/05-jms-apps\/run-getter.sh<\/pre>\n<p>Get the messages from the <code class=\"mqkafkacode\">COMMANDS<\/code> queue.<\/p>\n<p>You can do the above two steps multiple times.<\/p>\n<pre class=\"mqkafkastep\">.\/09-audit-app\/run-audit.sh<\/pre>\n<p>Retrieve the audit copy of all messages that have been put to the <code class=\"mqkafkacode\">COMMANDS<\/code> queue in the last one year from the <code class=\"mqkafkacode\">MQ.COMMANDS<\/code> Kafka topic. You should see the history of all messages put to the queue since you set up Streaming Queues.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Scenario You have an IBM MQ queue manager. An application is putting messages to a command queue. Another application gets these messages from the queue and takes actions in response. Objective You want multiple separate audit applications to be able to review the commands that go through the command queue. They should be able to [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":4628,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7,4],"tags":[593,584,55],"class_list":["post-4615","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","category-ibm","tag-apachekafka","tag-kafka","tag-mq"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/4615","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=4615"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/4615\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/4628"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=4615"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=4615"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=4615"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}