Scenario
You have an IBM MQ queue manager. An application is putting messages to a command queue. Another application gets these messages from the queue and takes actions in response.
Objective
You want multiple separate audit applications to be able to review the commands that go through the command queue.
They should be able to replay a history of these command messages as many times as they want.
This must not impact the application that is currently getting the messages from the queue.
Solution
You can use Streaming Queues to make a duplicate of every message put to the command queue to a separate copy queue.
This copy queue can be used to feed a connector that can produce every message to a Kafka topic. This Kafka topic can be used by audit applications
Details
The final solution works like this:
- A JMS application called
Putter
puts messages onto an IBM MQ queue calledCOMMANDS
- For the purposes of this demo, a development-instance of LDAP is used to authenticate access to IBM MQ
- A JMS application called
Getter
gets messages from theCOMMANDS
queue - Copies of every message put to the COMMANDS queue will be made to the
COMMANDS.COPY
queue - A Connector will get every message from the COMMANDS.COPY queue
- The Connector transforms each JMS message into a string, and produces it to the
MQ.COMMANDS
Kafka topic - A Java application called
Audit
can replay the history of all messages on the Kafka topic
- Summary
- Instructions
- Pre-requisites
- Setup IBM MQ
- JMS applications
- Setup IBM Event Streams
- Audit applications
Instructions
Pre-requisites
If you want to create the entire end-to-end demo solution as described above from scratch, you can simply follow the instructions below.
You will need:
- a Red Hat OpenShift cluster
- on your local machine:
- the OpenShift
oc
CLI, logged into your cluster gsed
docker
CLIJava
for compiling and running the test applicationsmvn
for building the test applications- a copy of the mq-kafka-connect-tutorial repository from Github
- the OpenShift
The instructions below are broken into sections, so if you instead want to use this tutorial as the basis for adding to an existing scenario (e.g. you already have an IBM MQ queue and want to add Kafka) you can just choose the relative sections to follow.
Setup IBM MQ
Setup LDAP
Install an development LDAP server that will manage user identities for the IBM MQ queue manager.
./01-install-ldap/setup.sh
Two user identities are defined in config.yaml:
mquser
that will be used by our JMS MQ applications
dn: uid=mquser,ou=people,dc=ibm,dc=com objectClass: inetOrgPerson objectClass: organizationalPerson objectClass: person objectClass: top cn: mquserCN sn: mquserSN uid: mquser userPassword: mquserpassword
kafkauser
that will be used later by the Kafka Connector
dn: uid=kafkauser,ou=people,dc=ibm,dc=com objectClass: inetOrgPerson objectClass: organizationalPerson objectClass: person objectClass: top cn: kafkauserCN sn: kafkauserSN uid: kafkauser userPassword: kafkapassword
Generate certificates
Generate a new self-signed CA, and use it to create certificates for the MQ queue manager, the JMS MQ client apps, and the Kafka Connector.
./02-generate-certs/generate.sh
The JMS client and Kafka Connector certificates are added to the queue manager’s keystore so that it will accept connections from them.
Ths certificate generation is all done in a Docker container to reduce the number of dependencies you need locally.
Running this will create:
streamingdemo-ca
files – with the CA used to sign all certs in the tutorial demostreamingdemo-jms-client
files – used by the JMS applicationsstreamingdemo-kafka-client
files – used by the Kafka Connect connector for connecting to IBM MQstreamingdemo-mq-server
files – used by the IBM MQ queue manager
Install the MQ Operator
Install the MQ Operator that will manage your IBM MQ queue manager.
./03-install-mq-operator/setup.sh
The tutorial script uses a specific channel for the MQ Operator that is supported on OpenShift 4.8 and earlier. You may want to do this step manually if you want to choose a different version.
Create the queue manager
Create and configure the queue manager to prepare it for using with the JMS applications
export IBM_ENTITLEMENT_KEY=yourentitlementkey ./04-create-queue-manager/setup.sh
BEFORE running this, you will need to set an environment variable with an Entitled Registry key from myibm.ibm.com
The queue manager config is contained in the qmgr-setup ConfigMap which:
Creates the COMMANDS
queue that the JMS applications will use
DEFINE QLOCAL(COMMANDS) REPLACE
Creates the APP.SVRCONN
channel that the JMS applications will use
DEFINE CHANNEL(APP.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE
Sets up security so that the JMS applications have to use the mquser
username / password defined in LDAP previously
SET CHLAUTH(APP.SVRCONN) TYPE(BLOCKUSER) USERLIST(*MQUSER) WARN(YES) ACTION(REPLACE) REFRESH SECURITY SET AUTHREC OBJTYPE(QMGR) GROUP('mqusers') AUTHADD(ALL) SET AUTHREC OBJTYPE(QUEUE) PROFILE('**') GROUP('mqusers') AUTHADD(ALL) SET AUTHREC OBJTYPE(CHANNEL) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)
JMS applications
Setup the JMS apps
Compile the JMS Putter and Getter applications
./05-jms-apps/build-apps.sh
The config for the apps is contained in Config.java.
The HOST
constant is modified by the build-apps.sh
script to match the hostname for your queue manager.
public static final String HOST = "PLACEHOLDERHOSTNAME"; public static final String QMGRNAME = "MYQMGR"; public static final String CHANNEL = "APP.SVRCONN"; public static final String QUEUE = "COMMANDS"; public static final String CIPHER = "ECDHE_RSA_AES_128_CBC_SHA256";
The truststore and keystore values set in Java system properties are the files created previously.
System.setProperty("javax.net.ssl.trustStore", "../02-generate-certs/certs/streamingdemo-ca.jks" ); System.setProperty("javax.net.ssl.keyStore", "../02-generate-certs/certs/streamingdemo-jms-client.jks" ); System.setProperty("javax.net.ssl.keyStorePassword", "passw0rd" ); System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
Verify the initial scenario
Send messages to the COMMANDS queue
./05-jms-apps/run-putter.sh
The Putter application puts five messages onto the COMMANDS queue and then terminates.
sendMessage("Perform the first action A", session, producer); sendMessage("Perform a second action B", session, producer); sendMessage("Perform the third action C", session, producer); sendMessage("Perform some penultimate fourth action D", session, producer); sendMessage("Perform a fifth and final action E", session, producer);
Verify the messages from the MQ web console
./05-jms-apps/access-mq-console.sh
Click through to the COMMANDS
queue, and you should see the five messages that the Putter
app put.
Get messages from the COMMANDS queue
./05-jms-apps/run-getter.sh
This will get messages from the COMMANDS
queue, and write them to the console.
---------------------------------------- Getting messages from the COMMANDS queue ---------------------------------------- JMSMessage class: jms_text JMSType: null JMSDeliveryMode: 2 JMSDeliveryDelay: 0 JMSDeliveryTime: 0 JMSExpiration: 0 JMSPriority: 4 JMSMessageID: ID:414d51204d59514d47522020202020207abdc96201310040 JMSTimestamp: 1657388750326 JMSCorrelationID: null JMSDestination: queue:///COMMANDS JMSReplyTo: null JMSRedelivered: false JMSXAppID: mq.samples.Putter JMSXDeliveryCount: 1 JMSXUserID: mquser JMS_IBM_Character_Set: UTF-8 JMS_IBM_Encoding: 273 JMS_IBM_Format: MQSTR JMS_IBM_MsgType: 8 JMS_IBM_PutApplType: 28 JMS_IBM_PutDate: 20220709 JMS_IBM_PutTime: 17455036 Perform the first action A JMSMessage class: jms_text JMSType: null JMSDeliveryMode: 2 JMSDeliveryDelay: 0 JMSDeliveryTime: 0 JMSExpiration: 0 JMSPriority: 4 JMSMessageID: ID:414d51204d59514d47522020202020207abdc96202310040 JMSTimestamp: 1657388750351 JMSCorrelationID: null JMSDestination: queue:///COMMANDS JMSReplyTo: null JMSRedelivered: false JMSXAppID: mq.samples.Putter JMSXDeliveryCount: 1 JMSXUserID: mquser JMS_IBM_Character_Set: UTF-8 JMS_IBM_Encoding: 273 JMS_IBM_Format: MQSTR JMS_IBM_MsgType: 8 JMS_IBM_PutApplType: 28 JMS_IBM_PutDate: 20220709 JMS_IBM_PutTime: 17455038 Perform a second action B JMSMessage class: jms_text JMSType: null JMSDeliveryMode: 2 JMSDeliveryDelay: 0 JMSDeliveryTime: 0 JMSExpiration: 0 JMSPriority: 4 JMSMessageID: ID:414d51204d59514d47522020202020207abdc96203310040 JMSTimestamp: 1657388750364 JMSCorrelationID: null JMSDestination: queue:///COMMANDS JMSReplyTo: null JMSRedelivered: false JMSXAppID: mq.samples.Putter JMSXDeliveryCount: 1 JMSXUserID: mquser JMS_IBM_Character_Set: UTF-8 JMS_IBM_Encoding: 273 JMS_IBM_Format: MQSTR JMS_IBM_MsgType: 8 JMS_IBM_PutApplType: 28 JMS_IBM_PutDate: 20220709 JMS_IBM_PutTime: 17455039 Perform the third action C JMSMessage class: jms_text JMSType: null JMSDeliveryMode: 2 JMSDeliveryDelay: 0 JMSDeliveryTime: 0 JMSExpiration: 0 JMSPriority: 4 JMSMessageID: ID:414d51204d59514d47522020202020207abdc96204310040 JMSTimestamp: 1657388750380 JMSCorrelationID: null JMSDestination: queue:///COMMANDS JMSReplyTo: null JMSRedelivered: false JMSXAppID: mq.samples.Putter JMSXDeliveryCount: 1 JMSXUserID: mquser JMS_IBM_Character_Set: UTF-8 JMS_IBM_Encoding: 273 JMS_IBM_Format: MQSTR JMS_IBM_MsgType: 8 JMS_IBM_PutApplType: 28 JMS_IBM_PutDate: 20220709 JMS_IBM_PutTime: 17455040 Perform some penultimate fourth action D JMSMessage class: jms_text JMSType: null JMSDeliveryMode: 2 JMSDeliveryDelay: 0 JMSDeliveryTime: 0 JMSExpiration: 0 JMSPriority: 4 JMSMessageID: ID:414d51204d59514d47522020202020207abdc96205310040 JMSTimestamp: 1657388750392 JMSCorrelationID: null JMSDestination: queue:///COMMANDS JMSReplyTo: null JMSRedelivered: false JMSXAppID: mq.samples.Putter JMSXDeliveryCount: 1 JMSXUserID: mquser JMS_IBM_Character_Set: UTF-8 JMS_IBM_Encoding: 273 JMS_IBM_Format: MQSTR JMS_IBM_MsgType: 8 JMS_IBM_PutApplType: 28 JMS_IBM_PutDate: 20220709 JMS_IBM_PutTime: 17455042 Perform a fifth and final action E
Verify again from the MQ web console
./05-jms-apps/access-mq-console.sh
You should see that the COMMANDS
queue is empty, as the Getter
app has retrieved the messages.
Setup IBM Event Streams
Install the Event Streams Operator
Install the Event Streams Operator that will manage your Kafka cluster and Connector.
./06-install-eventstreams-operator/setup.sh
The tutorial script uses a specific channel for the Event Streams Operator. You may want to do this step manually if you want to choose a different version.
Create the Kafka cluster
Create and configure the Kafka cluster that will host the audit topic
export IBM_ENTITLEMENT_KEY=yourentitlementkey ./07-create-kafka-cluster/setup.sh
BEFORE running this, you will need to set an environment variable with an Entitled Registry key from myibm.ibm.com
The Kafka cluster config is contained in the EventStreams spec:
Creates an internal bootstrap address that will be used by the Connector
- authentication: type: scram-sha-512 name: internal port: 9093 tls: true type: internal
Creates an external bootstrap address that will be used by the Audit
app you run from your local machine.
- authentication: type: scram-sha-512 name: external port: 9094 tls: true type: external
Setup the MQ/Kafka Connector
Setup the Kafka Connector
Start a Kafka Connector that will get a copy of messages from the COMMANDS MQ queue, and produce them to the MQ.COMMANDS Kafka topic
./08-setup-kafka-connect/setup.sh
This starts by using the modify-qmgr ConfigMap, adding a third 3-modifications.mqsc
entry to modify the MQ queue manager.
It creates a new COMMANDS.COPY
queue for the Connector to use, and alters the COMMANDS queue used by the JMS apps, so that the queue manager streams a copy of all messages to it.
DEFINE QLOCAL(COMMANDS.COPY) REPLACE ALTER QLOCAL(COMMANDS) STREAMQ('COMMANDS.COPY') STRMQOS(MUSTDUP)
It sets up a KAFKA.SVRCONN
channel for the Connector to use to connect to the queue manager.
DEFINE CHANNEL(KAFKA.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE
It updates the queue manager security so that the Connector can access the queue using the kafkauser
username / password defined in LDAP previously
SET CHLAUTH(KAFKA.SVRCONN) TYPE(BLOCKUSER) USERLIST(*KAFKAUSER) WARN(YES) ACTION(REPLACE) REFRESH SECURITY SET AUTHREC OBJTYPE(QMGR) GROUP('kafkausers') AUTHADD(CONNECT, INQ) SET AUTHREC OBJTYPE(QUEUE) PROFILE(COMMANDS.COPY) GROUP('kafkausers') AUTHADD(ALLMQI)
It creates a Kafka topic that the Connector can produce to. The topic is configured to retain a copy of all commands that have been sent for the last 365 days.
apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaTopic metadata: labels: eventstreams.ibm.com/cluster: eventstreams name: commands.topic namespace: kafka spec: config: min.insync.replicas: '1' retention.ms: '31536000000' partitions: 1 replicas: 3 topicName: MQ.COMMANDS
It generates credentials that the Connector can use to connect to Kafka. These will be used to produce to the MQ.COMMANDS
topic, as well as to create and use topics that Kafka Connect uses to store state.
apiVersion: eventstreams.ibm.com/v1beta1 kind: KafkaUser metadata: name: kafka-connect-credentials namespace: kafka labels: eventstreams.ibm.com/cluster: eventstreams spec: authentication: # generate username/password for this user type: scram-sha-512 authorization: acls: # --------------------------------------- # cluster permissions # --------------------------------------- # check existing cluster config - operation: DescribeConfigs resource: type: cluster # --------------------------------------- # topic permissions # --------------------------------------- # check existing topics - operation: DescribeConfigs resource: name: '*' patternType: literal type: topic # create topics (both to produce to, and to use for internal state) - operation: Create resource: name: '*' patternType: literal type: topic # consume from topics (needed to retrieve state from internal topics) - operation: Read resource: name: '*' patternType: literal type: topic # produce to topics (both writing to internal state topics and messages being produced by connectors) - operation: Write resource: name: '*' patternType: literal type: topic # --------------------------------------- # consumer group permissions # --------------------------------------- - operation: Read resource: name: '*' patternType: literal type: group # --------------------------------------- # transaction permissions # --------------------------------------- # create transactions - operation: Write resource: name: '*' patternType: literal type: transactionalId type: simple
The setup script also creates secrets that store credentials and certificates that the Connector uses to connect to MQ.
oc apply -f ./resources/mq-creds.yaml oc create secret generic jms-client-truststore \ --from-file=ca.jks=../02-generate-certs/certs/streamingdemo-ca.jks \ --from-literal=password='passw0rd' \ -n kafka --dry-run=client -oyaml | oc apply -f - oc create secret generic jms-client-keystore \ --from-file=client.jks=../02-generate-certs/certs/streamingdemo-kafka-client.jks \ --from-literal=password='passw0rd' \ -n kafka --dry-run=client -oyaml | oc apply -f - oc create secret generic mq-ca-tls \ --from-file=ca.crt=../02-generate-certs/certs/streamingdemo-ca.crt \ -n kafka --dry-run=client -oyaml | oc apply -f -
The setup script builds a simple custom Docker image based on the Event Streams Kafka container, extended with the jar for the MQ source connector.
FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.0.2 USER root RUN mkdir -p /opt/kafka/plugins/ RUN curl -Lo /opt/kafka/plugins/kafka-connect-mq-source.jar https://github.com/ibm-messaging/kafka-connect-mq-source/releases/download/v1.3.1/kafka-connect-mq-source-1.3.1-jar-with-dependencies.jar USER 1001
The Connect cluster definition specifies the address for the Kafka cluster to connect to, the custom Docker image to use, and provides access to the Kafka and MQ credentials and certificates that the Connector will need.
image: image-registry.openshift-image-registry.svc:5000/kafka/kafkaconnectwithmq:latest bootstrapServers: eventstreams-kafka-bootstrap.kafka.svc:9093 tls: trustedCertificates: - secretName: eventstreams-cluster-ca-cert certificate: ca.crt - secretName: mq-ca-tls certificate: ca.crt authentication: type: scram-sha-512 username: kafka-connect-credentials passwordSecret: secretName: kafka-connect-credentials password: password externalConfiguration: volumes: - name: mq-credentials secret: secretName: mq-credentials - name: jms-client-truststore secret: secretName: jms-client-truststore - name: jms-client-keystore secret: secretName: jms-client-keystore
Finally, the setup script starts the MQ/Kafka connector using the mq-connector spec.
# the Kafka topic to produce to topic: MQ.COMMANDS # the MQ queue to get messages from mq.queue: COMMANDS.COPY # messages sent to the MQ queue are JMS TextMessages mq.message.body.jms: true key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder # connection details for the queue manager mq.queue.manager: MYQMGR mq.connection.name.list: queuemanager-ibm-mq.ibmmq(1414) mq.channel.name: KAFKA.SVRCONN mq.user.name: ${file:/opt/kafka/external-configuration/mq-credentials:username} mq.password: ${file:/opt/kafka/external-configuration/mq-credentials:password} # SSL config for connecting to MQ mq.ssl.use.ibm.cipher.mappings: false mq.ssl.cipher.suite: '*TLS12' mq.ssl.truststore.location: /opt/kafka/external-configuration/jms-client-truststore/ca.jks mq.ssl.truststore.password: ${file:/opt/kafka/external-configuration/jms-client-truststore:password} mq.ssl.keystore.location: /opt/kafka/external-configuration/jms-client-keystore/client.jks mq.ssl.keystore.password: ${file:/opt/kafka/external-configuration/jms-client-keystore:password}
Audit applications
Setup the Kafka Java app
Compile the Java Kafka application
./09-audit-app/build-apps.sh
The config for the apps is contained in Config.java.
The BOOTSTRAP
constant is modified by the build-apps.sh
script to match the hostname for your Kafka cluster.
public static final String TOPIC = "MQ.COMMANDS"; public static final String BOOTSTRAP = "PLACEHOLDERBOOTSTRAP"; public static final String CLIENT = "audit-app"; public static final String GROUP = "audit"; public static final String TRUSTSTORE = "./ca.p12"; public static final String TRUSTSTORE_PASSWORD = "PLACEHOLDERTRUSTSTOREPASSWORD"; public static final String USERNAME = "audit-app"; public static final String PASSWORD = "PLACEHOLDERAPPPASSWORD";
The credentials and Kafka cluster truststore generated by the Event Streams Operator are downloaded by the setup script.
BOOTSTRAP=$(oc get eventstreams eventstreams -nkafka -ojsonpath='{.status.kafkaListeners[?(@.type=="external")].bootstrapServers}') gsed -i -e 's/PLACEHOLDERBOOTSTRAP/'$BOOTSTRAP'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java CA_PASSWORD=$(oc get secret eventstreams-cluster-ca-cert -nkafka -o 'go-template={{index .data "ca.password"}}' | base64 -d) gsed -i -e 's/PLACEHOLDERTRUSTSTOREPASSWORD/'$CA_PASSWORD'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java PASSWORD=$(oc get secret audit-app -nkafka -o jsonpath='{..password}' | base64 -d) gsed -i -e 's/PLACEHOLDERAPPPASSWORD/'$PASSWORD'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java
The Audit
app avoids committing any offsets to the Kafka cluster, so that it can always replay all events from the Kafka topic, every time the application is run.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Demo the whole solution
Use MQ Streaming Queues and Kafka Connect to make an auditable copy of IBM MQ messages
./05-jms-apps/run-putter.sh
Put messages to the COMMANDS
queue.
./05-jms-apps/run-getter.sh
Get the messages from the COMMANDS
queue.
You can do the above two steps multiple times.
./09-audit-app/run-audit.sh
Retrieve the audit copy of all messages that have been put to the COMMANDS
queue in the last one year from the MQ.COMMANDS
Kafka topic. You should see the history of all messages put to the queue since you set up Streaming Queues.
Tags: apachekafka, kafka, mq