How to use MQ Streaming Queues and Kafka Connect to make an auditable copy of IBM MQ messages

Scenario

You have an IBM MQ queue manager. An application is putting messages to a command queue. Another application gets these messages from the queue and takes actions in response.

diagram

Objective

You want multiple separate audit applications to be able to review the commands that go through the command queue.

They should be able to replay a history of these command messages as many times as they want.

This must not impact the application that is currently getting the messages from the queue.

diagram

Solution

You can use Streaming Queues to make a duplicate of every message put to the command queue to a separate copy queue.

This copy queue can be used to feed a connector that can produce every message to a Kafka topic. This Kafka topic can be used by audit applications

diagram

Details

The final solution works like this:

diagram

  1. A JMS application called Putter puts messages onto an IBM MQ queue called COMMANDS
  2. For the purposes of this demo, a development-instance of LDAP is used to authenticate access to IBM MQ
  3. A JMS application called Getter gets messages from the COMMANDS queue
  4. Copies of every message put to the COMMANDS queue will be made to the COMMANDS.COPY queue
  5. A Connector will get every message from the COMMANDS.COPY queue
  6. The Connector transforms each JMS message into a string, and produces it to the MQ.COMMANDS Kafka topic
  7. A Java application called Audit can replay the history of all messages on the Kafka topic


Instructions

Pre-requisites

If you want to create the entire end-to-end demo solution as described above from scratch, you can simply follow the instructions below.

You will need:

  • a Red Hat OpenShift cluster
  • on your local machine:

The instructions below are broken into sections, so if you instead want to use this tutorial as the basis for adding to an existing scenario (e.g. you already have an IBM MQ queue and want to add Kafka) you can just choose the relative sections to follow.

Setup IBM MQ

Setup LDAP

Install an development LDAP server that will manage user identities for the IBM MQ queue manager.

./01-install-ldap/setup.sh

diagram

Two user identities are defined in config.yaml:

mquser that will be used by our JMS MQ applications

dn: uid=mquser,ou=people,dc=ibm,dc=com
objectClass: inetOrgPerson
objectClass: organizationalPerson
objectClass: person
objectClass: top
cn: mquserCN
sn: mquserSN
uid: mquser
userPassword: mquserpassword

kafkauser that will be used later by the Kafka Connector

dn: uid=kafkauser,ou=people,dc=ibm,dc=com
objectClass: inetOrgPerson
objectClass: organizationalPerson
objectClass: person
objectClass: top
cn: kafkauserCN
sn: kafkauserSN
uid: kafkauser
userPassword: kafkapassword

Generate certificates

Generate a new self-signed CA, and use it to create certificates for the MQ queue manager, the JMS MQ client apps, and the Kafka Connector.

./02-generate-certs/generate.sh

The JMS client and Kafka Connector certificates are added to the queue manager’s keystore so that it will accept connections from them.

Ths certificate generation is all done in a Docker container to reduce the number of dependencies you need locally.

Running this will create:

  • streamingdemo-ca files – with the CA used to sign all certs in the tutorial demo
  • streamingdemo-jms-client files – used by the JMS applications
  • streamingdemo-kafka-client files – used by the Kafka Connect connector for connecting to IBM MQ
  • streamingdemo-mq-server files – used by the IBM MQ queue manager

Install the MQ Operator

Install the MQ Operator that will manage your IBM MQ queue manager.

./03-install-mq-operator/setup.sh

The tutorial script uses a specific channel for the MQ Operator that is supported on OpenShift 4.8 and earlier. You may want to do this step manually if you want to choose a different version.

Create the queue manager

Create and configure the queue manager to prepare it for using with the JMS applications

export IBM_ENTITLEMENT_KEY=yourentitlementkey
./04-create-queue-manager/setup.sh

diagram

BEFORE running this, you will need to set an environment variable with an Entitled Registry key from myibm.ibm.com

The queue manager config is contained in the qmgr-setup ConfigMap which:

Creates the COMMANDS queue that the JMS applications will use

DEFINE QLOCAL(COMMANDS) REPLACE

Creates the APP.SVRCONN channel that the JMS applications will use

DEFINE CHANNEL(APP.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE

Sets up security so that the JMS applications have to use the mquser username / password defined in LDAP previously

SET CHLAUTH(APP.SVRCONN) TYPE(BLOCKUSER) USERLIST(*MQUSER) WARN(YES) ACTION(REPLACE)

REFRESH SECURITY

SET AUTHREC OBJTYPE(QMGR) GROUP('mqusers') AUTHADD(ALL)
SET AUTHREC OBJTYPE(QUEUE) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)
SET AUTHREC OBJTYPE(CHANNEL) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)

JMS applications

Setup the JMS apps

Compile the JMS Putter and Getter applications

./05-jms-apps/build-apps.sh

diagram

The config for the apps is contained in Config.java.

The HOST constant is modified by the build-apps.sh script to match the hostname for your queue manager.

public static final String HOST = "PLACEHOLDERHOSTNAME";
public static final String QMGRNAME = "MYQMGR";
public static final String CHANNEL = "APP.SVRCONN";
public static final String QUEUE = "COMMANDS";
public static final String CIPHER = "ECDHE_RSA_AES_128_CBC_SHA256";

The truststore and keystore values set in Java system properties are the files created previously.

System.setProperty("javax.net.ssl.trustStore", "../02-generate-certs/certs/streamingdemo-ca.jks" );
System.setProperty("javax.net.ssl.keyStore", "../02-generate-certs/certs/streamingdemo-jms-client.jks" );
System.setProperty("javax.net.ssl.keyStorePassword", "passw0rd" );
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");

Verify the initial scenario

Send messages to the COMMANDS queue

./05-jms-apps/run-putter.sh

diagram

The Putter application puts five messages onto the COMMANDS queue and then terminates.

sendMessage("Perform the first action A", session, producer);
sendMessage("Perform a second action B", session, producer);
sendMessage("Perform the third action C", session, producer);
sendMessage("Perform some penultimate fourth action D", session, producer);
sendMessage("Perform a fifth and final action E", session, producer);

Verify the messages from the MQ web console

./05-jms-apps/access-mq-console.sh

diagram

Click through to the COMMANDS queue, and you should see the five messages that the Putter app put.

Get messages from the COMMANDS queue

./05-jms-apps/run-getter.sh

This will get messages from the COMMANDS queue, and write them to the console.

----------------------------------------
Getting messages from the COMMANDS queue
----------------------------------------

    JMSMessage class: jms_text
    JMSType:          null
    JMSDeliveryMode:  2
    JMSDeliveryDelay: 0
    JMSDeliveryTime:  0
    JMSExpiration:    0
    JMSPriority:      4
    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96201310040
    JMSTimestamp:     1657388750326
    JMSCorrelationID: null
    JMSDestination:   queue:///COMMANDS
    JMSReplyTo:       null
    JMSRedelivered:   false
    JMSXAppID: mq.samples.Putter
    JMSXDeliveryCount: 1
    JMSXUserID: mquser
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format: MQSTR
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20220709
    JMS_IBM_PutTime: 17455036
Perform the first action A

    JMSMessage class: jms_text
    JMSType:          null
    JMSDeliveryMode:  2
    JMSDeliveryDelay: 0
    JMSDeliveryTime:  0
    JMSExpiration:    0
    JMSPriority:      4
    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96202310040
    JMSTimestamp:     1657388750351
    JMSCorrelationID: null
    JMSDestination:   queue:///COMMANDS
    JMSReplyTo:       null
    JMSRedelivered:   false
    JMSXAppID: mq.samples.Putter
    JMSXDeliveryCount: 1
    JMSXUserID: mquser
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format: MQSTR
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20220709
    JMS_IBM_PutTime: 17455038
Perform a second action B

    JMSMessage class: jms_text
    JMSType:          null
    JMSDeliveryMode:  2
    JMSDeliveryDelay: 0
    JMSDeliveryTime:  0
    JMSExpiration:    0
    JMSPriority:      4
    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96203310040
    JMSTimestamp:     1657388750364
    JMSCorrelationID: null
    JMSDestination:   queue:///COMMANDS
    JMSReplyTo:       null
    JMSRedelivered:   false
    JMSXAppID: mq.samples.Putter
    JMSXDeliveryCount: 1
    JMSXUserID: mquser
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format: MQSTR
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20220709
    JMS_IBM_PutTime: 17455039
Perform the third action C

    JMSMessage class: jms_text
    JMSType:          null
    JMSDeliveryMode:  2
    JMSDeliveryDelay: 0
    JMSDeliveryTime:  0
    JMSExpiration:    0
    JMSPriority:      4
    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96204310040
    JMSTimestamp:     1657388750380
    JMSCorrelationID: null
    JMSDestination:   queue:///COMMANDS
    JMSReplyTo:       null
    JMSRedelivered:   false
    JMSXAppID: mq.samples.Putter
    JMSXDeliveryCount: 1
    JMSXUserID: mquser
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format: MQSTR
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20220709
    JMS_IBM_PutTime: 17455040
Perform some penultimate fourth action D

    JMSMessage class: jms_text
    JMSType:          null
    JMSDeliveryMode:  2
    JMSDeliveryDelay: 0
    JMSDeliveryTime:  0
    JMSExpiration:    0
    JMSPriority:      4
    JMSMessageID:     ID:414d51204d59514d47522020202020207abdc96205310040
    JMSTimestamp:     1657388750392
    JMSCorrelationID: null
    JMSDestination:   queue:///COMMANDS
    JMSReplyTo:       null
    JMSRedelivered:   false
    JMSXAppID: mq.samples.Putter
    JMSXDeliveryCount: 1
    JMSXUserID: mquser
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format: MQSTR
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20220709
    JMS_IBM_PutTime: 17455042
Perform a fifth and final action E

Verify again from the MQ web console

./05-jms-apps/access-mq-console.sh

diagram

You should see that the COMMANDS queue is empty, as the Getter app has retrieved the messages.

Setup IBM Event Streams

Install the Event Streams Operator

Install the Event Streams Operator that will manage your Kafka cluster and Connector.

./06-install-eventstreams-operator/setup.sh

The tutorial script uses a specific channel for the Event Streams Operator. You may want to do this step manually if you want to choose a different version.

Create the Kafka cluster

Create and configure the Kafka cluster that will host the audit topic

export IBM_ENTITLEMENT_KEY=yourentitlementkey
./07-create-kafka-cluster/setup.sh

diagram

BEFORE running this, you will need to set an environment variable with an Entitled Registry key from myibm.ibm.com

The Kafka cluster config is contained in the EventStreams spec:

Creates an internal bootstrap address that will be used by the Connector

- authentication:
    type: scram-sha-512
  name: internal
  port: 9093
  tls: true
  type: internal

Creates an external bootstrap address that will be used by the Audit app you run from your local machine.

- authentication:
    type: scram-sha-512
  name: external
  port: 9094
  tls: true
  type: external

Setup the MQ/Kafka Connector

Setup the Kafka Connector

Start a Kafka Connector that will get a copy of messages from the COMMANDS MQ queue, and produce them to the MQ.COMMANDS Kafka topic

./08-setup-kafka-connect/setup.sh

diagram

This starts by using the modify-qmgr ConfigMap, adding a third 3-modifications.mqsc entry to modify the MQ queue manager.

It creates a new COMMANDS.COPY queue for the Connector to use, and alters the COMMANDS queue used by the JMS apps, so that the queue manager streams a copy of all messages to it.

DEFINE QLOCAL(COMMANDS.COPY) REPLACE
ALTER QLOCAL(COMMANDS) STREAMQ('COMMANDS.COPY') STRMQOS(MUSTDUP)

It sets up a KAFKA.SVRCONN channel for the Connector to use to connect to the queue manager.

DEFINE CHANNEL(KAFKA.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE

It updates the queue manager security so that the Connector can access the queue using the kafkauser username / password defined in LDAP previously

SET CHLAUTH(KAFKA.SVRCONN) TYPE(BLOCKUSER) USERLIST(*KAFKAUSER) WARN(YES) ACTION(REPLACE)

REFRESH SECURITY

SET AUTHREC OBJTYPE(QMGR) GROUP('kafkausers') AUTHADD(CONNECT, INQ)
SET AUTHREC OBJTYPE(QUEUE) PROFILE(COMMANDS.COPY) GROUP('kafkausers') AUTHADD(ALLMQI)

diagram

It creates a Kafka topic that the Connector can produce to. The topic is configured to retain a copy of all commands that have been sent for the last 365 days.

apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaTopic
metadata:
  labels:
    eventstreams.ibm.com/cluster: eventstreams
  name: commands.topic
  namespace: kafka
spec:
  config:
    min.insync.replicas: '1'
    retention.ms: '31536000000'
  partitions: 1
  replicas: 3
  topicName: MQ.COMMANDS

It generates credentials that the Connector can use to connect to Kafka. These will be used to produce to the MQ.COMMANDS topic, as well as to create and use topics that Kafka Connect uses to store state.

apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaUser
metadata:
  name: kafka-connect-credentials
  namespace: kafka
  labels:
    eventstreams.ibm.com/cluster: eventstreams
spec:
  authentication:
    # generate username/password for this user
    type: scram-sha-512
  authorization:
    acls:
      # ---------------------------------------
      # cluster permissions
      # ---------------------------------------
      # check existing cluster config
      - operation: DescribeConfigs
        resource:
          type: cluster
      # ---------------------------------------
      # topic permissions
      # ---------------------------------------
      # check existing topics
      - operation: DescribeConfigs
        resource:
          name: '*'
          patternType: literal
          type: topic
      # create topics (both to produce to, and to use for internal state)
      - operation: Create
        resource:
          name: '*'
          patternType: literal
          type: topic
      # consume from topics (needed to retrieve state from internal topics)
      - operation: Read
        resource:
          name: '*'
          patternType: literal
          type: topic
      # produce to topics (both writing to internal state topics and messages being produced by connectors)
      - operation: Write
        resource:
          name: '*'
          patternType: literal
          type: topic
      # ---------------------------------------
      # consumer group permissions
      # ---------------------------------------
      - operation: Read
        resource:
          name: '*'
          patternType: literal
          type: group
      # ---------------------------------------
      # transaction permissions
      # ---------------------------------------
      # create transactions
      - operation: Write
        resource:
          name: '*'
          patternType: literal
          type: transactionalId
    type: simple

The setup script also creates secrets that store credentials and certificates that the Connector uses to connect to MQ.

oc apply -f ./resources/mq-creds.yaml

oc create secret generic jms-client-truststore \
    --from-file=ca.jks=../02-generate-certs/certs/streamingdemo-ca.jks \
    --from-literal=password='passw0rd' \
    -n kafka --dry-run=client -oyaml | oc apply -f -

oc create secret generic jms-client-keystore \
    --from-file=client.jks=../02-generate-certs/certs/streamingdemo-kafka-client.jks \
    --from-literal=password='passw0rd' \
    -n kafka --dry-run=client -oyaml | oc apply -f -

oc create secret generic mq-ca-tls \
    --from-file=ca.crt=../02-generate-certs/certs/streamingdemo-ca.crt \
    -n kafka --dry-run=client -oyaml | oc apply -f -

diagram

The setup script builds a simple custom Docker image based on the Event Streams Kafka container, extended with the jar for the MQ source connector.

FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.0.2

USER root
RUN mkdir -p /opt/kafka/plugins/
RUN curl -Lo /opt/kafka/plugins/kafka-connect-mq-source.jar https://github.com/ibm-messaging/kafka-connect-mq-source/releases/download/v1.3.1/kafka-connect-mq-source-1.3.1-jar-with-dependencies.jar

USER 1001

The Connect cluster definition specifies the address for the Kafka cluster to connect to, the custom Docker image to use, and provides access to the Kafka and MQ credentials and certificates that the Connector will need.

image: image-registry.openshift-image-registry.svc:5000/kafka/kafkaconnectwithmq:latest

bootstrapServers: eventstreams-kafka-bootstrap.kafka.svc:9093

tls:
  trustedCertificates:
  - secretName: eventstreams-cluster-ca-cert
    certificate: ca.crt
  - secretName: mq-ca-tls
    certificate: ca.crt
authentication:
  type: scram-sha-512
  username: kafka-connect-credentials
  passwordSecret:
    secretName: kafka-connect-credentials
    password: password
externalConfiguration:
  volumes:
    - name: mq-credentials
      secret:
        secretName: mq-credentials
    - name: jms-client-truststore
      secret:
        secretName: jms-client-truststore
    - name: jms-client-keystore
      secret:
        secretName: jms-client-keystore

Finally, the setup script starts the MQ/Kafka connector using the mq-connector spec.

# the Kafka topic to produce to
topic: MQ.COMMANDS

# the MQ queue to get messages from
mq.queue: COMMANDS.COPY

# messages sent to the MQ queue are JMS TextMessages
mq.message.body.jms: true
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder

# connection details for the queue manager
mq.queue.manager: MYQMGR
mq.connection.name.list: queuemanager-ibm-mq.ibmmq(1414)
mq.channel.name: KAFKA.SVRCONN
mq.user.name: ${file:/opt/kafka/external-configuration/mq-credentials:username}
mq.password: ${file:/opt/kafka/external-configuration/mq-credentials:password}

# SSL config for connecting to MQ
mq.ssl.use.ibm.cipher.mappings: false
mq.ssl.cipher.suite: '*TLS12'
mq.ssl.truststore.location: /opt/kafka/external-configuration/jms-client-truststore/ca.jks
mq.ssl.truststore.password: ${file:/opt/kafka/external-configuration/jms-client-truststore:password}
mq.ssl.keystore.location: /opt/kafka/external-configuration/jms-client-keystore/client.jks
mq.ssl.keystore.password: ${file:/opt/kafka/external-configuration/jms-client-keystore:password}

Audit applications

Setup the Kafka Java app

Compile the Java Kafka application

./09-audit-app/build-apps.sh

diagram

The config for the apps is contained in Config.java.

The BOOTSTRAP constant is modified by the build-apps.sh script to match the hostname for your Kafka cluster.

public static final String TOPIC = "MQ.COMMANDS";

public static final String BOOTSTRAP = "PLACEHOLDERBOOTSTRAP";
public static final String CLIENT = "audit-app";
public static final String GROUP = "audit";

public static final String TRUSTSTORE = "./ca.p12";
public static final String TRUSTSTORE_PASSWORD = "PLACEHOLDERTRUSTSTOREPASSWORD";

public static final String USERNAME = "audit-app";
public static final String PASSWORD = "PLACEHOLDERAPPPASSWORD";

The credentials and Kafka cluster truststore generated by the Event Streams Operator are downloaded by the setup script.

BOOTSTRAP=$(oc get eventstreams eventstreams -nkafka -ojsonpath='{.status.kafkaListeners[?(@.type=="external")].bootstrapServers}')
gsed -i -e 's/PLACEHOLDERBOOTSTRAP/'$BOOTSTRAP'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java

CA_PASSWORD=$(oc get secret eventstreams-cluster-ca-cert -nkafka -o 'go-template={{index .data "ca.password"}}' | base64 -d)
gsed -i -e 's/PLACEHOLDERTRUSTSTOREPASSWORD/'$CA_PASSWORD'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java

PASSWORD=$(oc get secret audit-app -nkafka -o jsonpath='{..password}' | base64 -d)
gsed -i -e 's/PLACEHOLDERAPPPASSWORD/'$PASSWORD'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java

The Audit app avoids committing any offsets to the Kafka cluster, so that it can always replay all events from the Kafka topic, every time the application is run.

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Demo the whole solution

Use MQ Streaming Queues and Kafka Connect to make an auditable copy of IBM MQ messages

diagram

./05-jms-apps/run-putter.sh

Put messages to the COMMANDS queue.

./05-jms-apps/run-getter.sh

Get the messages from the COMMANDS queue.

You can do the above two steps multiple times.

./09-audit-app/run-audit.sh

Retrieve the audit copy of all messages that have been put to the COMMANDS queue in the last one year from the MQ.COMMANDS Kafka topic. You should see the history of all messages put to the queue since you set up Streaming Queues.

Tags: , ,

Comments are closed.