In this post, I want to suggest some approaches for introducing event-driven architecture patterns into your existing application environment. I’ll demonstrate how you can incrementally adopt Apache Kafka without needing to immediately build new applications or rebuild your existing applications, and show how this can be delivered in Red Hat OpenShift.
Posts Tagged ‘apachekafka’
Taking your first step towards an event-driven architecture
Friday, December 3rd, 2021Describing Kafka security in AsyncAPI
Wednesday, November 17th, 2021As part of AsyncAPI Conference this week, I ran a session on how to describe Kafka security in AsyncAPI.
The aim of the session was to quickly show how to describe the security configuration of a Kafka cluster in an AsyncAPI document.
And, in reverse, if you’ve been given an AsyncAPI document, to show how to use that to configure a Kafka client or application to connect to the cluster, using the details in the AsyncAPI spec.
The recording and the slides I used are below.
Processing Apache Avro-serialized Kafka messages with IBM App Connect Enterprise
Monday, October 25th, 2021UPDATE: Please see the updated version of this post
IBM App Connect Enterprise (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka.
In this post, I’ll describe how to use App Connect Enterprise to process Kafka messages that were serialized to a stream of bytes using Apache Avro schemas.

Background
Best practice when using Apache Kafka is to define Apache Avro schemas with a definition of the structure of your Kafka messages.
(For more detail about this, see my last post on From bytes to objects: describing Kafka events, or the intro to Avro that I wrote a couple of years ago.)
In this post, I’m assuming that you have embraced Avro, and you have Kafka topics with messages that were serialized using Avro schemas.
Perhaps you used a Java producer with an Avro SerDe that handled the serialization automatically for you.
Or your messages are coming from a Kafka Connect source connector, with an Avro converter that is handling the serialization for you.
Or you are doing the serialization yourself, such as if you’re producing Avro-serialized messages from a Python app.
Now you want to use IBM App Connect Enterprise to develop and host integrations for processing those Kafka messages. But you need App Connect to know how to:
- retrieve the Avro schemas it needs
- use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that are easy for ACE to manipulate and process
From bytes to objects: describing Kafka events
Saturday, October 23rd, 2021The recording of the talk that Kate Stanley and I gave at Kafka Summit Americas is now available.
Events stored in Kafka are just bytes, this is one of the reasons Kafka is so flexible. But when developing a producer or consumer you want objects, not bytes. Documenting and defining events provides a common way to discuss and agree on an approach to using Kafka. It also informs developers how to consume events without needing access to the developers responsible for producing events.
In our talk, we introduced the most popular formats for documenting events that flow through Kafka, such as AsyncAPI, Avro, CloudEvents, JSON schemas, and Protobuf.
We discussed the differences between the approaches and how to decide on a documentation strategy. Alongside the formats, we also touched on the tooling available for the different approaches. Tools for testing and code generation can make a big difference to your day-to-day developer experience.
The talk was aimed at developers who maybe aren’t already documenting their Kafka events or who wanted to see other approaches.
Stock price events for Kafka
Saturday, August 28th, 2021I’ve made a Kafka Connect source connector for sending “real-time” (not really) stock price events to a Kafka topic.
Choose your stock (e.g. “IBM“) and run the Connector pointed at your topic, and every minute a JSON event will be produced like:
{"open":137.9,"high":137.9,"low":137.9,"close":137.9,"volume":500,"timestamp":1629421200,"datetime":"2021-08-19 20:00:00"}
Describing Kafka security in AsyncAPI
Tuesday, June 29th, 2021The new version of AsyncAPI, 2.1.0, was released today. One of the updates is that it lets you describe Kafka security mechanisms and protocols. In this post, I’ll show how you can do this, and how it relates to configuring a Kafka client.
| Kafka config | means | AsyncAPI | |||
|---|---|---|---|---|---|
sasl.mechanism |
security.protocol |
encryption? | auth? | server protocol |
security scheme type |
| unset | PLAINTEXT |
no | no | kafka |
|
PLAIN |
SASL_PLAINTEXT |
no | yes, using SASL/PLAIN | kafka |
plain |
SCRAM-SHA-256 |
SASL_PLAINTEXT |
no | yes, using SASL/SCRAM | kafka |
scramSha256 |
SCRAM-SHA-512 |
SASL_PLAINTEXT |
no | yes, using SASL/SCRAM | kafka |
scramSha512 |
OAUTHBEARER |
SASL_PLAINTEXT |
no | yes, using OAuth | kafka |
oauth2 |
GSSAPI |
SASL_PLAINTEXT |
no | yes, using GSSAPI | kafka |
gssapi |
| unset | SSL |
yes | no | kafka-secure |
|
PLAIN |
SASL_SSL |
yes | yes, using SASL/PLAIN | kafka-secure |
plain |
SCRAM-SHA-256 |
SASL_SSL |
yes | yes, using SASL/SCRAM | kafka-secure |
scramSha256 |
SCRAM-SHA-512 |
SASL_SSL |
yes | yes, using SASL/SCRAM | kafka-secure |
scramSha512 |
OAUTHBEARER |
SASL_SSL |
yes | yes, using OAuth | kafka-secure |
oauth2 |
GSSAPI |
SASL_SSL |
yes | yes, using GSSAPI | kafka-secure |
gssapi |
| unset | SSL |
yes | yes, using mutual TLS | kafka-secure |
X509 |
How to avoid SSL handshake errors in your Kafka client because of a self-signed cluster CA
Sunday, June 27th, 2021You’re trying to connect a Kafka client to a development Apache Kafka cluster which has been quickly set up using a self-signed CA certificate. You don’t have a copy of that CA certificate, and (because it’s not signed by a well-known CA) your Kafka client is failing because of SSL handshake errors.
The error contains messages like
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
and
javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target.
For example:
$ ./bin/kafka-console-consumer.sh \ --bootstrap-server dale-kafka-saslscram-bootstrap-strimzi.apps.eem-test-fest-6.cp.fyre.ibm.com:443 \ --topic DALE.TOPIC \ --group dalegrp \ --consumer-property 'security.protocol=SASL_SSL' \ --consumer-property 'sasl.mechanism=SCRAM-SHA-512' \ --consumer-property 'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="dale-user" password="pSRtfwTMKNlz";' [2021-06-27 23:19:06,048] ERROR [Consumer clientId=consumer-dalegrp-1, groupId=dalegrp] Connection to node -1 (dale-kafka-saslscram-bootstrap-strimzi.apps.eem-test-fest-6.cp.fyre.ibm.com/9.46.199.58:443) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient) [2021-06-27 23:19:06,049] WARN [Consumer clientId=consumer-dalegrp-1, groupId=dalegrp] Bootstrap broker dale-kafka-saslscram-bootstrap-strimzi.apps.eem-test-fest-6.cp.fyre.ibm.com:443 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-06-27 23:19:06,069] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:269) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1339) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1214) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1157) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) at java.base/java.security.AccessController.doPrivileged(AccessController.java:770) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) at org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:244) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:444) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439) at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) at java.base/sun.security.validator.Validator.validate(Validator.java:264) at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:276) at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1317) ... 29 more Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) ... 35 more Processed a total of 0 messages
I’m assuming that this is just for development purposes, that you know it’s safe to trust the certificate that the Kafka cluster is presenting, and that you’d rather just workaround the error than ask the owner of the Kafka cluster for a copy of their CA.
Event Endpoint Management
Sunday, June 27th, 2021Last week, we released the latest version of Event Endpoint Management in IBM Cloud Pak for Integration 2021.2.1. It allows organisations to share and manage access to their Kafka topics. In this post, I want to share a run-through of how it all works.
I’ll start with a high level summary overview, then a walkthrough demo video, and finally share some links to related reading if you’d like more detail.
Overview

click for a larger version of the diagram – numbers in the diagram are described below
Kafka topic owner
This is someone who has a Kafka topic, and is running an application or system that is producing a stream of events to that topic.
They think this stream of events might be useful to other developers in their organisation, so they describe it (using AsyncAPI) and publish this to a catalog where it can be discovered and managed.
- creates a Kafka topic and an application that produces events to it
- describes and documents their Kafka topic, and the events that are being produced to it
- publishes the description of their Kafka topic
- pushes the Kafka cluster security info to the Event Gateway service so it can manage access to the topic for the topic owner
App developer
This is someone who is building an application that could benefit from a stream of events.
They are able to discover the event sources that have been shared in their organisation, and get access to them through a self-service Developer Portal.
- creates credentials for use in their application
- registers new application credentials
- updates the Event Gateway service with the new application credentials
- creates or configures an application with guidance from the Portal
- application connects to the Event Gateway service
- application connection routed securely to the Kafka brokers