Archive for the ‘code’ Category

Event Endpoint Management “demo in a box”

Friday, December 17th, 2021

In this post, I’ll share how you can get your own Event Endpoint Management demo instance with just seven minutes of work.

(Seven minutes of hands-on-keyboard time… there is a lot of waiting-for-stuff-to-run time, but it doesn’t sound so impressive if I include waiting time!)


(more…)

Taking your first step towards an event-driven architecture

Friday, December 3rd, 2021

In this post, I want to suggest some approaches for introducing event-driven architecture patterns into your existing application environment. I’ll demonstrate how you can incrementally adopt Apache Kafka without needing to immediately build new applications or rebuild your existing applications, and show how this can be delivered in Red Hat OpenShift.

(more…)

Describing Kafka security in AsyncAPI

Wednesday, November 17th, 2021

As part of AsyncAPI Conference this week, I ran a session on how to describe Kafka security in AsyncAPI.

The aim of the session was to quickly show how to describe the security configuration of a Kafka cluster in an AsyncAPI document.

And, in reverse, if you’ve been given an AsyncAPI document, to show how to use that to configure a Kafka client or application to connect to the cluster, using the details in the AsyncAPI spec.

The recording and the slides I used are below.


youtu.be/CeGOLijUuQc


slides on Slideshare

Processing Apache Avro-serialized Kafka messages with IBM App Connect Enterprise

Monday, October 25th, 2021

IBM App Connect Enterprise (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka.

In this post, I’ll describe how to use App Connect Enterprise to process Kafka messages that were serialized to a stream of bytes using Apache Avro schemas.

screenshot

Background

Best practice when using Apache Kafka is to define Apache Avro schemas with a definition of the structure of your Kafka messages.

(For more detail about this, see my last post on From bytes to objects: describing Kafka events, or the intro to Avro that I wrote a couple of years ago.)

In this post, I’m assuming that you have embraced Avro, and you have Kafka topics with messages that were serialized using Avro schemas.

Perhaps you used a Java producer with an Avro SerDe that handled the serialization automatically for you.

Or your messages are coming from a Kafka Connect source connector, with an Avro converter that is handling the serialization for you.

Or you are doing the serialization yourself, such as if you’re producing Avro-serialized messages from a Python app.

Now you want to use IBM App Connect Enterprise to develop and host integrations for processing those Kafka messages. But you need App Connect to know how to:

  • retrieve the Avro schemas it needs
  • use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that are easy for ACE to manipulate and process

(more…)

Stock price events for Kafka

Saturday, August 28th, 2021

I’ve made a Kafka Connect source connector for sending “real-time” (not really) stock price events to a Kafka topic.

Choose your stock (e.g. “IBM“) and run the Connector pointed at your topic, and every minute a JSON event will be produced like:

{"open":137.9,"high":137.9,"low":137.9,"close":137.9,"volume":500,"timestamp":1629421200,"datetime":"2021-08-19 20:00:00"}

(more…)

Visualizing TensorFlow image classifier behaviour

Saturday, July 10th, 2021

How to use Scratch to create a visualization that explains what parts of an image a TensorFlow image classifier finds the most significant.

An image classifier recognizes this image as an image of The Doctor.


prediction: The Doctor
confidence: 99.97%

Why? What parts of the image did the classifier recognize as indicating that this is the Doctor?

How could we tell?

(more…)

Describing Kafka security in AsyncAPI

Tuesday, June 29th, 2021

The new version of AsyncAPI, 2.1.0, was released today. One of the updates is that it lets you describe Kafka security mechanisms and protocols. In this post, I’ll show how you can do this, and how it relates to configuring a Kafka client.

Kafka config means AsyncAPI
sasl.mechanism security.protocol encryption? auth? server protocol security scheme type
unset PLAINTEXT no no kafka
PLAIN SASL_PLAINTEXT no yes, using SASL/PLAIN kafka plain
SCRAM-SHA-256 SASL_PLAINTEXT no yes, using SASL/SCRAM kafka scramSha256
SCRAM-SHA-512 SASL_PLAINTEXT no yes, using SASL/SCRAM kafka scramSha512
OAUTHBEARER SASL_PLAINTEXT no yes, using OAuth kafka oauth2
GSSAPI SASL_PLAINTEXT no yes, using GSSAPI kafka gssapi
unset SSL yes no kafka-secure
PLAIN SASL_SSL yes yes, using SASL/PLAIN kafka-secure plain
SCRAM-SHA-256 SASL_SSL yes yes, using SASL/SCRAM kafka-secure scramSha256
SCRAM-SHA-512 SASL_SSL yes yes, using SASL/SCRAM kafka-secure scramSha512
OAUTHBEARER SASL_SSL yes yes, using OAuth kafka-secure oauth2
GSSAPI SASL_SSL yes yes, using GSSAPI kafka-secure gssapi
unset SSL yes yes, using mutual TLS kafka-secure X509

(more…)

How to avoid SSL handshake errors in your Kafka client because of a self-signed cluster CA

Sunday, June 27th, 2021

You’re trying to connect a Kafka client to a development Apache Kafka cluster which has been quickly set up using a self-signed CA certificate. You don’t have a copy of that CA certificate, and (because it’s not signed by a well-known CA) your Kafka client is failing because of SSL handshake errors.

The error contains messages like
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
and
javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target.

For example:

$ ./bin/kafka-console-consumer.sh \
 --bootstrap-server dale-kafka-saslscram-bootstrap-strimzi.apps.eem-test-fest-6.cp.fyre.ibm.com:443 \
 --topic DALE.TOPIC \
 --group dalegrp \
 --consumer-property 'security.protocol=SASL_SSL' \
 --consumer-property 'sasl.mechanism=SCRAM-SHA-512' \
 --consumer-property 'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="dale-user" password="pSRtfwTMKNlz";'

[2021-06-27 23:19:06,048] ERROR [Consumer clientId=consumer-dalegrp-1, groupId=dalegrp] Connection to node -1 (dale-kafka-saslscram-bootstrap-strimzi.apps.eem-test-fest-6.cp.fyre.ibm.com/9.46.199.58:443) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-06-27 23:19:06,049] WARN [Consumer clientId=consumer-dalegrp-1, groupId=dalegrp] Bootstrap broker dale-kafka-saslscram-bootstrap-strimzi.apps.eem-test-fest-6.cp.fyre.ibm.com:443 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-06-27 23:19:06,069] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
	at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326)
	at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:269)
	at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)
	at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1339)
	at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1214)
	at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1157)
	at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
	at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
	at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
	at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:770)
	at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
	at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430)
	at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514)
	at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368)
	at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291)
	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:244)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
	at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:444)
	at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
	at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
	at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
	at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
	at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
	at java.base/sun.security.validator.Validator.validate(Validator.java:264)
	at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
	at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:276)
	at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
	at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1317)
	... 29 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
	at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
	at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
	at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
	... 35 more
Processed a total of 0 messages

I’m assuming that this is just for development purposes, that you know it’s safe to trust the certificate that the Kafka cluster is presenting, and that you’d rather just workaround the error than ask the owner of the Kafka cluster for a copy of their CA.

(more…)