IBM App Connect Enterprise (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka.
In this post, I’ll describe how to use App Connect Enterprise to process Kafka messages that were serialized using Apache Avro schemas.
This is an update to an earlier version of this post, reflecting updates to the sample code.
Background
Best practice when using Apache Kafka is to define Apache Avro schemas with a definition of the structure of your Kafka messages, and to store those schemas in a central registry that client applications can access at runtime.
If you want to use IBM App Connect Enterprise to develop and host integrations for processing those Kafka messages, you need App Connect to know how to:
- retrieve the Avro schemas it needs using schema registry REST APIs
- use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that ACE can manipulate and process
Using a Java compute node
The simplest way to do this is using a Java Compute node that serves the same purpose as a SerDe client in a traditional Java Kafka application.
In this post, I’ll share an example implementation that demonstrates how this can be done.
It is available on github. Instructions for how to use the sample are in the README.md in that repository. I’ll use this post to give a high-level overview and share examples of how to configure it to match the configuration for your Kafka applications.
KafkaConsumer node
Configure this node to start consuming messages from your Kafka topics. It will emit BLOB binary messages each containing the raw stream of bytes from a Kafka message.
Screenshot from IBM Event Streams but the examples in this post could be used with Avro-serialized messages on any Kafka cluster
Avro Deserialize Java Compute node
This node will:
- identify the ID for the schema that was used to serialize that specific message (either from the message headers or the start of the message payload)
- use the ID to download the Avro schema from a schema registry
- use the downloaded Avro schema to deserialize the binary message data
- emit the deserialized message data as a structured JSON object
Similar to how most SerDe clients work, performance is improved by caching and reusing Avro schemas where possible.
If the Compute node cannot deserialize the message data, it emits the raw Kafka message as-is to the alt output terminal, so they can be sent to a dead-letter-queue for review or alternative processing. Possible reasons why this might happen include:
- The schema that was originally used to serialize the message data has since been deleted from the Schema Registry and is no longer available
- The schema registry is not currently available
- Schema registry credentials provided to the node were rejected
Examples of using the Java Compute node
Setting up
To illustrate the way this can be used, I started a variety of Kafka Connect Connectors. Each was configured slightly differently.
Screenshot from my OpenShift Console, showing the Connectors I used for this post. The definitions are available in Github, but I’ll copy the relevant config below.
These all produce the same type of randomly generated events, each to different topics. This gave me a variety of topics, all using slightly different approaches of serializing the message data and different mechanisms for recording the schema that was used.
Screenshot from the IBM Event Streams UI showing the topics I used for this post. The definitions are available in Github, but there is nothing specific about the config that is relevant here other than the topic name.
All of the messages used the same type of schema, but a separate schema was registered for each topic, allowing me to illustrate how the Java Compute node can lookup and retrieve different schemas when the identity is encoded in different ways.
The demo data
All of the messages in the following examples have data in this sort of shape:
{ "id": "f08c2354-4a1d-479b-a1c8-a1166e948393", "customer": { "id": "18237e95-8e55-4893-9382-7821056e4e49", "name": "Britteny Ledner", "emails": [ "britteny.ledner@example.com" ] }, "products": [ "L Classic Low-rise Jeans", "M Black Bootcut Jeans" ], "address": { "shippingaddress": { "number": 69, "street": "Desmond Course", "city": "Jasttown", "zipcode": "92337", "country": { "code": "US", "name": "United States" }, "phones": null }, "billingaddress": { "number": 571, "street": "Schumm Loop", "city": "Port Hansshire", "zipcode": "99742-5419", "country": { "code": "US", "name": "United States" }, "phones": null } }, "ordertime": "2024-05-12T14:28:36.344Z" }
demos/example-message/online-order.json
Obviously, this is JSON and not Avro. The actual messages look more like this, but that’s harder to visualise!
The demo processing
To illustrate the way that App Connect Enterprise can process this binary data once it is able to deserialize it, I created this flow:
The avro deserialize node is a Java Compute node using the AvroDeserialize.java
source in Github.
The redact customer name node is a Compute node using this ESQL:
SET OutputRoot.JSON.Data.customer.name = 'redacted';
demos/ace-setup/redaction.esql#L7
Showing that we can replace the customer name is enough to illustrate that the flow has been able to successfully parse the binary data and apply the Avro schema correctly.
The demo output
My flow outputs the parsed and modified messages in the JSON output format to /tmp/ace/output
. I’ll include screenshots of this file to show the results of processing.
Configurations
For each of these examples, I’ll start with a different way to configure a Kafka producer, and then show how to configure the Avro Deserialize node to process the resulting messages.
Storing the content ID of the schema as a Long at the start of a binary-encoded message payload
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.use-id: contentId value.converter.apicurio.registry.headers.enabled: false value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L89-L96
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>contentId</schema.registry.use.id> <schema.registry.id.length>8</schema.registry.id.length>
demos/ace-setup/schemaregistry-policies/online-payload-long-content.policy.xml#L5-L7
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the content ID of the schema as an Integer at the start of a binary-encoded message payload
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.as-confluent: true value.converter.apicurio.registry.use-id: contentId value.converter.apicurio.registry.headers.enabled: false value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L40-L48
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>contentId</schema.registry.use.id> <schema.registry.id.length>4</schema.registry.id.length>
demos/ace-setup/schemaregistry-policies/online-payload-int-content.policy.xml#L5-L7
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the global ID of the schema as a Long at the start of a binary-encoded message payload
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.use-id: globalId value.converter.apicurio.registry.headers.enabled: false value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L186-L193
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>globalId</schema.registry.use.id> <schema.registry.id.length>8</schema.registry.id.length>
demos/ace-setup/schemaregistry-policies/online-payload-long-global.policy.xml#L5-L7
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the global ID of the schema as an Integer at the start of a binary-encoded message payload
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.as-confluent: true value.converter.apicurio.registry.use-id: globalId value.converter.apicurio.registry.headers.enabled: false value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L137-L145
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>globalId</schema.registry.use.id> <schema.registry.id.length>4</schema.registry.id.length>
demos/ace-setup/schemaregistry-policies/online-payload-int-global.policy.xml#L5-L7
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the content ID of the schema in the header of a binary-encoded message
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.use-id: contentId value.converter.apicurio.registry.headers.enabled: true value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L234-L241
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>contentId</schema.registry.use.id>
demos/ace-setup/schemaregistry-policies/online-header-content.policy.xml#L5-L6
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the global ID of the schema in the header of a binary-encoded message
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.use-id: globalId value.converter.apicurio.registry.headers.enabled: true value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L282-L289
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>globalId</schema.registry.use.id>
demos/ace-setup/schemaregistry-policies/online-header-global.policy.xml#L5-L6
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the content ID of the schema in the header of a JSON-encoded message
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.use-id: contentId value.converter.apicurio.registry.headers.enabled: true value.converter.apicurio.registry.avro.encoding: JSON
demos/kafka-setup/connectors.yaml#L331-L338
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>contentId</schema.registry.use.id>
demos/ace-setup/schemaregistry-policies/online-header-json-content.policy.xml#L5-L6
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the global ID of the schema in the header of a JSON-encoded message
The Converter was configured to produce messages like this:
value.converter.apicurio.registry.use-id: globalId value.converter.apicurio.registry.headers.enabled: true value.converter.apicurio.registry.avro.encoding: JSON
demos/kafka-setup/connectors.yaml#L380-L387
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>apicurio</schema.registry.api> <schema.registry.use.id>globalId</schema.registry.use.id>
demos/ace-setup/schemaregistry-policies/online-header-json-global.policy.xml#L5-L6
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
Storing the schema ID in a schema registry with a Confluent-compatible REST API
Confluent SerDe libraries store the schema ID as an Integer at the start of a message payload. I recreated this behaviour by configuring a Converter to produce messages like this:
value.converter.apicurio.registry.as-confluent: true value.converter.apicurio.registry.use-id: contentId value.converter.apicurio.registry.headers.enabled: false value.converter.apicurio.registry.avro.encoding: BINARY
demos/kafka-setup/connectors.yaml#L40-L48
This results in Kafka messages like this:
I configured the Avro Deserialize node using:
<schema.registry.api>confluent</schema.registry.api> <schema.registry.id.length>4</schema.registry.id.length>
demos/ace-setup/schemaregistry-policies/online-confluent.policy.xml#L4-L6
This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:
If you’d like to recreate this demo, you can find everything I used to set this up in Github, including how I set up Kafka and App Connect.
Tags: apacheavro, apachekafka, appconnect, avro, ibmeventstreams, kafka