Kafka Connect Single Message Transformations (SMTs) and MirrorMaker can be a simple way of doing stateless transformations on a stream of events.
There are many options available for processing a stream of events – the two I work with most frequently are Flink and Kafka Streams, both of which offer a range of ways to do powerful stateful processing on an event stream. In this post, I’ll share a third, perhaps overlooked, option: Kafka Connect.
There are two pieces that we can bring together here:
Piece 1: MirrorSourceConnector
MirrorSourceConnector
is one of the connectors used when setting up MirrorMaker 2. Looking at it in isolation, a (very simplified!) way of thinking of it is as a pairing of a Kafka consumer and Kafka producer – consuming messages from one topic, and producing them to another.
This is normally used where the two topics are on different Kafka clusters (for a wide variety of use cases that benefit from being able to mirror events from one Kafka cluster to another). This means it has config properties for a “source” Kafka cluster and a “target” Kafka cluster, but there is nothing stopping you providing the same connection details for both.
Piece 2: SMTs
Single Message Transforms (SMT) are small, stateless, components that can be inserted into a Connect pipeline to modify the contents of events being brought in-to/out-from Kafka using connectors. I’ve referred to them before but Robin’s deep-dive is a better place to start if you haven’t used SMTs before.
The key thing to recognise is that transformations that can be done in a stateless way (i.e. without needing to refer to other events in the stream or invoke any external systems) is a good candidate for being implemented as an SMT.
Bringing these pieces together…
If you combine the ability to consume events from one topic and produce to another, with the ability to implement transforms that can modify event payloads – you have everything you need to perform simple stream processing.
This is by no means a replacement for the sorts of stream processing frameworks I normally use, because of the limitations around the nature of processing that is possible (i.e. only what can be done without needing to refer to other events in the stream or invoke any external systems). If those limitations are acceptable, the benefit of this approach is that it removes the need to introduce an additional runtime component (e.g. Apache Flink or Kafka Streams) into your architecture that needs separate monitoring and maintenance.
Try it yourself (on your own computer)
As an example of what can be done, I’ll show how to take a stream of XML events, and convert them into JSON.
First, I’ll show how to run this locally. Assuming that you’re running Kafka as described in the Quick Start in the Kafka documentation, then the config you need is:
connector.properties
name=my-simplified-stream-processor # the connector to use connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector # connection details source.cluster.alias=source source.cluster.bootstrap.servers=localhost:9092 target.cluster.bootstrap.servers=localhost:9092 # copy keys as-is key.converter=org.apache.kafka.connect.converters.ByteArrayConverter key.converter.schemas.enable=false # read value of input topic as XML (without schema) transforms=xmlconvert,route transforms.xmlconvert.type=com.ibm.eventstreams.kafkaconnect.plugins.xml.XmlTransformation transforms.xmlconvert.converter.type=value transforms.xmlconvert.root.element.name=doc # output value as JSON (without schema) value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false # specify input topic topics=TRANSFORM # specify output topic replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter transforms.route.regex=(.*) transforms.route.replacement=$1-JSON # disable functionality for mirror'ing use cases refresh.topics.enabled=false sync.topic.acls.enabled=false sync.topic.configs.enabled=false
(You will need to download the jar from github for the SMT that is doing the XML conversion in this particular example, and put it in your plugins folder. But you could use one of the SMTs that come out-of-the-box with Kafka to do many simple transforms and skip that step.)
To run this, you would need:
$KAFKA_HOME/bin/connect-standalone.sh \ $KAFKA_HOME/config/connect-standalone.properties \ connector.properties
Now you can try it out! Use this to start producing XML events to the TRANSFORM
topic…
$KAFKA_HOME/bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic TRANSFORM > <doc><temperature>18.4</temperature><humidity>37.1</humidity></doc> > <doc><temperature>18.9</temperature><humidity>39.2</humidity></doc>
And consume the JSON version from the TRANSFORM-JSON
topic…
$KAFKA_HOME/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic TRANSFORM-JSON \ --from-beginning { "temperature": 18.4, "humidity": 37.1 } { "temperature": 18.9, "humidity": 39.2 }
Try it yourself (in Kubernetes)
If you’re running Kafka in Kubernetes using the open source Strimzi Operator or IBM Event Streams, then the benefits of using Kafka Connect for this are illustrated even more clearly: those operators have fantastic support for running Connectors from declarative specifications in K8s custom resources.
An equivalent of creating the above example with Event Streams can be found in my kafka-demos repository on Github. To do the same with Strimzi, you just need to update the apiVersion
at the top of the spec, but it is otherwise the same for both.
What do you think?
Does it make sense to use (a bit of) MirrorMaker 2 with only a single Kafka cluster? It definitely feels like a hack, and certainly isn’t what MM2 was created for.
But you get so much “for free” with Connect (a clean and simple transform Java interface; ecosystem with a wide variety of existing converters and transforms; mature and resilient hosting platform with a declarative and REST API configuration interfaces; reliably resilient to restarts; support for emitting metrics to allow monitoring; etc.) that would be a huge effort to reinvent if you were to try and replicate the equivalent functionality in a Kafka Streams application – so I think it’s worth considering.
Tags: apachekafka, kafka