Processing Apache Avro-serialized messages from Kafka using IBM App Connect Enterprise

IBM App Connect Enterprise (ACE) is a broker for developing and hosting high-throughput, high-scale integrations between a large number of applications and systems, including Apache Kafka.

In this post, I’ll describe how to use App Connect Enterprise to process Kafka messages that were serialized using Apache Avro schemas.

screenshot

This is an update to an earlier version of this post, reflecting updates to the sample code.

Background

Best practice when using Apache Kafka is to define Apache Avro schemas with a definition of the structure of your Kafka messages, and to store those schemas in a central registry that client applications can access at runtime.

If you want to use IBM App Connect Enterprise to develop and host integrations for processing those Kafka messages, you need App Connect to know how to:

  • retrieve the Avro schemas it needs using schema registry REST APIs
  • use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that ACE can manipulate and process

Using a Java compute node

The simplest way to do this is using a Java Compute node that serves the same purpose as a SerDe client in a traditional Java Kafka application.

In this post, I’ll share an example implementation that demonstrates how this can be done.

It is available on github. Instructions for how to use the sample are in the README.md in that repository. I’ll use this post to give a high-level overview and share examples of how to configure it to match the configuration for your Kafka applications.

screenshot

KafkaConsumer node

Configure this node to start consuming messages from your Kafka topics. It will emit BLOB binary messages each containing the raw stream of bytes from a Kafka message.

screenshot
Screenshot from IBM Event Streams but the examples in this post could be used with Avro-serialized messages on any Kafka cluster

Avro Deserialize Java Compute node

This node will:

Similar to how most SerDe clients work, performance is improved by caching and reusing Avro schemas where possible.

If the Compute node cannot deserialize the message data, it emits the raw Kafka message as-is to the alt output terminal, so they can be sent to a dead-letter-queue for review or alternative processing. Possible reasons why this might happen include:

  • The schema that was originally used to serialize the message data has since been deleted from the Schema Registry and is no longer available
  • The schema registry is not currently available
  • Schema registry credentials provided to the node were rejected

Examples of using the Java Compute node

Setting up

To illustrate the way this can be used, I started a variety of Kafka Connect Connectors. Each was configured slightly differently.

screenshot
Screenshot from my OpenShift Console, showing the Connectors I used for this post. The definitions are available in Github, but I’ll copy the relevant config below.

These all produce the same type of randomly generated events, each to different topics. This gave me a variety of topics, all using slightly different approaches of serializing the message data and different mechanisms for recording the schema that was used.

screenshot
Screenshot from the IBM Event Streams UI showing the topics I used for this post. The definitions are available in Github, but there is nothing specific about the config that is relevant here other than the topic name.

All of the messages used the same type of schema, but a separate schema was registered for each topic, allowing me to illustrate how the Java Compute node can lookup and retrieve different schemas when the identity is encoded in different ways.

screenshot

The demo data

All of the messages in the following examples have data in this sort of shape:

{
    "id": "f08c2354-4a1d-479b-a1c8-a1166e948393",
    "customer": {
        "id": "18237e95-8e55-4893-9382-7821056e4e49",
        "name": "Britteny Ledner",
        "emails": [
            "britteny.ledner@example.com"
        ]
    },
    "products": [
        "L Classic Low-rise Jeans",
        "M Black Bootcut Jeans"
    ],
    "address": {
        "shippingaddress": {
            "number": 69,
            "street": "Desmond Course",
            "city": "Jasttown",
            "zipcode": "92337",
            "country": {
                "code": "US",
                "name": "United States"
            },
            "phones": null
        },
        "billingaddress": {
            "number": 571,
            "street": "Schumm Loop",
            "city": "Port Hansshire",
            "zipcode": "99742-5419",
            "country": {
                "code": "US",
                "name": "United States"
            },
            "phones": null
        }
    },
    "ordertime": "2024-05-12T14:28:36.344Z"
}

demos/example-message/online-order.json

Obviously, this is JSON and not Avro. The actual messages look more like this, but that’s harder to visualise!

screenshot

The demo processing

To illustrate the way that App Connect Enterprise can process this binary data once it is able to deserialize it, I created this flow:

screenshot

The avro deserialize node is a Java Compute node using the AvroDeserialize.java source in Github.

The redact customer name node is a Compute node using this ESQL:

SET OutputRoot.JSON.Data.customer.name = 'redacted';

demos/ace-setup/redaction.esql#L7

Showing that we can replace the customer name is enough to illustrate that the flow has been able to successfully parse the binary data and apply the Avro schema correctly.

The demo output

screenshot

My flow outputs the parsed and modified messages in the JSON output format to /tmp/ace/output. I’ll include screenshots of this file to show the results of processing.

Configurations

For each of these examples, I’ll start with a different way to configure a Kafka producer, and then show how to configure the Avro Deserialize node to process the resulting messages.


Storing the content ID of the schema as a Long at the start of a binary-encoded message payload

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.use-id: contentId
value.converter.apicurio.registry.headers.enabled: false
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L89-L96

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>contentId</schema.registry.use.id>
<schema.registry.id.length>8</schema.registry.id.length>

demos/ace-setup/schemaregistry-policies/online-payload-long-content.policy.xml#L5-L7

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the content ID of the schema as an Integer at the start of a binary-encoded message payload

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.as-confluent: true
value.converter.apicurio.registry.use-id: contentId
value.converter.apicurio.registry.headers.enabled: false
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L40-L48

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>contentId</schema.registry.use.id>
<schema.registry.id.length>4</schema.registry.id.length>

demos/ace-setup/schemaregistry-policies/online-payload-int-content.policy.xml#L5-L7

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the global ID of the schema as a Long at the start of a binary-encoded message payload

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.use-id: globalId
value.converter.apicurio.registry.headers.enabled: false
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L186-L193

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>globalId</schema.registry.use.id>
<schema.registry.id.length>8</schema.registry.id.length>

demos/ace-setup/schemaregistry-policies/online-payload-long-global.policy.xml#L5-L7

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the global ID of the schema as an Integer at the start of a binary-encoded message payload

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.as-confluent: true
value.converter.apicurio.registry.use-id: globalId
value.converter.apicurio.registry.headers.enabled: false
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L137-L145

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>globalId</schema.registry.use.id>
<schema.registry.id.length>4</schema.registry.id.length>

demos/ace-setup/schemaregistry-policies/online-payload-int-global.policy.xml#L5-L7

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the content ID of the schema in the header of a binary-encoded message

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.use-id: contentId
value.converter.apicurio.registry.headers.enabled: true
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L234-L241

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>contentId</schema.registry.use.id>

demos/ace-setup/schemaregistry-policies/online-header-content.policy.xml#L5-L6

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the global ID of the schema in the header of a binary-encoded message

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.use-id: globalId
value.converter.apicurio.registry.headers.enabled: true
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L282-L289

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>globalId</schema.registry.use.id>

demos/ace-setup/schemaregistry-policies/online-header-global.policy.xml#L5-L6

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the content ID of the schema in the header of a JSON-encoded message

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.use-id: contentId
value.converter.apicurio.registry.headers.enabled: true
value.converter.apicurio.registry.avro.encoding: JSON

demos/kafka-setup/connectors.yaml#L331-L338

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>contentId</schema.registry.use.id>

demos/ace-setup/schemaregistry-policies/online-header-json-content.policy.xml#L5-L6

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the global ID of the schema in the header of a JSON-encoded message

The Converter was configured to produce messages like this:

value.converter.apicurio.registry.use-id: globalId
value.converter.apicurio.registry.headers.enabled: true
value.converter.apicurio.registry.avro.encoding: JSON

demos/kafka-setup/connectors.yaml#L380-L387

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>apicurio</schema.registry.api>
<schema.registry.use.id>globalId</schema.registry.use.id>

demos/ace-setup/schemaregistry-policies/online-header-json-global.policy.xml#L5-L6

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


Storing the schema ID in a schema registry with a Confluent-compatible REST API

Confluent SerDe libraries store the schema ID as an Integer at the start of a message payload. I recreated this behaviour by configuring a Converter to produce messages like this:

value.converter.apicurio.registry.as-confluent: true
value.converter.apicurio.registry.use-id: contentId
value.converter.apicurio.registry.headers.enabled: false
value.converter.apicurio.registry.avro.encoding: BINARY

demos/kafka-setup/connectors.yaml#L40-L48

This results in Kafka messages like this:

screenshot

I configured the Avro Deserialize node using:

<schema.registry.api>confluent</schema.registry.api>
<schema.registry.id.length>4</schema.registry.id.length>

demos/ace-setup/schemaregistry-policies/online-confluent.policy.xml#L4-L6

This enabled the App Connect flow to output the JSON format results with the customer name redacted by the ESQL processing:


If you’d like to recreate this demo, you can find everything I used to set this up in Github, including how I set up Kafka and App Connect.

Tags: , , , , ,

Leave a Reply