Using Kafka Streams for a Kafka Event Projection

In this post, I’ll walk through a sample implementation of Kafka Streams to maintain an Event Projection. I’ll use this to illustrate when this is a suitable approach to use.

I’ve written similar Event Projection posts about sample implementations that use an in-memory lookup table, and a PostgreSQL database.

The objective for this demo

I introduced the pattern of Event Projections in Comparing approaches to maintaining an Event Projection from Kafka topics.

I also explained the scenario that I’ve been using for each of my Event Projections demos. If you haven’t seen my other posts, it may help to go back and see the scenario detail and motivation first.

In short, I’m showing how to maintain a projection of two Kafka topics (one based on the event key, the other based on an attribute in the event payload). And I’m showing how an application could make an HTTP/REST call to retrieve the data from the most recent event that matches some query.

At a high-level, the goal for this demo is to:

  • use Kafka Streams to maintain a projection of the Kafka topics
  • provide an HTTP/REST API for querying the projection

For demo purposes, my “application” will be curl, so I can illustrate being able to query the projection like this.

After an event like this arrives on the SENSOR.READINGS Kafka topic:

I will query for the latest temperature and humidity for that location:

curl --silent \
  http://projectionsdemo.apps.dale-lane.demo.ibm.com/sensorreadings/G-0-15 | jq

{
    "humidity": 43,
    "sensorId": "G-0-15",
    "sensorTime": "Thu Nov 28 13:47:09 GMT 2024",
    "temperature": 22.3
}

And after an event like this arrives on the DOOR.BADGEIN Kafka topic:

I will query for the latest employee to have gone through that door.

curl --silent \
  http://projectionsdemo.apps.dale-lane.demo.ibm.com/badgeins/DE-1-16 | jq

{
    "badgeTime": "2024-11-28 13:54:48.702",
    "doorId": "DE-1-16",
    "employee": "geralyn.littel",
    "recordId": "0fc23607-4527-4b6a-bb19-9e6aac5c22ba"
}

Demo implementation

You can find the source code for a demo implementation in Github at dalelane/event-projections-demos:

In this post, I’ll be going through the 03-kafka-streams folder, which contains the third of my demos. It includes a complete and deployable demo, pre-configured to run in OpenShift and work with the demo scenario environment I described in my first post.

As with my other Event Projections demos, I used OpenLiberty as the basis for the demo server. This makes it simple to create, but also hints at what a more realistic deployment would look like.

Code structure

The code for this is a little more complex than the earlier event projection approaches (in-memory and database), however I hope this sample will illustrate that it is still simple.

Most of the source code for this demo is identical to the earlier approaches, but I’ll quickly remind you of what is in here. Next I’ll show how the demo maintains the projection. And finally, I’ll show how the demo accesses the projection.

Code structure: common across Event Projection demos

  • api – provides the HTTP/REST API for the application to query
    • exceptions – error handling – such as if the client application requests something that isn’t in the projection
    • health – Kubernetes probe endpoints – as I wrote this to run in OpenShift
  • data – data definitions
    • loosehanger – definitions of the data
    • serdes – serializers / deserializers for parsing the data
  • utils – helper class
    • KafkaConfig.java – connection details for Kafka (uses a properties file when running locally, or from a Secret when running in OpenShift)

There are also some common config files:

Code structure: maintaining a projection

I’m using Kafka Streams to group the events by the value I’ll be using to look them up and retrieve them. groupBy

To keep only the latest event for each key, I’ve written a simple Reducer that returns the newer event. (olderEvent, laterEvent) -> laterEvent

  • streams – the Kafka Streams specific logic
    • LatestSensorReading.java – subscribe to the SENSOR.READINGS topic and store events in a key-value store indexed by key
        /** Key/Value store of sensor readings, keyed by the sensor id. */
      public static final StoreQueryParameters<ReadOnlyKeyValueStore<String, SensorReading>> STORE =
          StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());
       
      public static void create(final StreamsBuilder builder) {
          log.info("Creating latest sensor readings stream");
       
          builder
              // consume the input topic with sensor reading events
              .stream(INPUT_TOPIC, Consumed.with(ProjectionsSerdes.SENSOR_ID_SERDES,
                                                 ProjectionsSerdes.SENSOR_READING_SERDES))
              // group the events by the ID of the sensor that recorded the reading
              .groupByKey()
              // keep the latest event from each sensor
              .reduce((olderEvent, laterEvent) -> laterEvent,
                      Named.as("keep_latest_sensor_readings"),
                      Materialized.<String, SensorReading,
                                    KeyValueStore<Bytes, byte[]>> as (STORE_NAME));
      }
    • LatestDoorBadgeIn.java – subscribe to the DOOR.BADGEIN topic and store events in a key-value store indexed by the door ID property
        /** Key/Value store of door-badge events, keyed by the door id. */
      public static final StoreQueryParameters<ReadOnlyKeyValueStore<String, DoorBadgeIn>> STORE =
          StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());
       
      public static void create(final StreamsBuilder builder) {
          log.info("Creating latest door badge events stream");
       
          builder
              // consume the input topic with door badge events
              //   note that the existing key (unique record ids) are not helpful to us
              //    so we will be ignoring it
              .stream(INPUT_TOPIC, Consumed.with(ProjectionsSerdes.IGNORE,
                                                 ProjectionsSerdes.DOOR_BADGEIN_SERDES))
              // re-key the events, so that they are grouped by the ID of the door
              //   where the event was recorded
              .groupBy((recordId, doorBadgeIn) -> doorBadgeIn.getDoorId())
              // keep the latest event from each door
              .reduce((olderEvent, laterEvent) -> laterEvent,
                      Named.as("keep_latest_door_badgeins"),
                      Materialized.<String, DoorBadgeIn,
                                    KeyValueStore<Bytes, byte[]>> as (STORE_NAME));
      }
  • KafkaProjectionsService.java – starts both of those streams
      log.info("Getting connection details for Kafka");
    final Properties props = KafkaConfig.getKafkaClientConfiguration();
     
    log.info("Starting Kafka Streams jobs");
    final StreamsBuilder builder = new StreamsBuilder();
    LatestSensorReading.create(builder);
    LatestDoorBadgeIn.create(builder);
     
    log.info("Preparing Streams application");
    final Topology topology = builder.build();
    log.info(topology.describe().toString());
     
    log.info("Starting Streams application");
    streams = new KafkaStreams(topology, props);
    streams.start();

Notice that when you start the application, Kafka Streams will create additional topics in your Kafka cluster. The two “changelog” topics provide the persistent storage for the projection, while the “repartition” topic is used by the LatestDoorBadgeIn job to re-group the events by the door ID.

Note that Kafka Streams applications do also use filesystem storage to support the RocksDB database that holds the current state.

For this demo, you can see that I’m using ephemeral storage for this when running in OpenShift. This means that Streams will need to rebuild some state from the Kafka topics on startup. A persistent volume would save this startup time. For more complex Streams applications I would consider this a necessity, but for a job this simple I think I could tolerate the startup cost.

Code structure: accessing a Kafka Streams projection

The key-value store created by the Streams can be queried, so this meant I could easily access values from the projection.

  • KafkaProjectionsService.java – provides access to the stores
      public static ReadOnlyKeyValueStore<String, SensorReading> getLatestSensorReadingsStore() {
        return streams.store(LatestSensorReading.STORE);
    }
    public static ReadOnlyKeyValueStore<String, DoorBadgeIn> getLatestDoorBadgeInStore() {
        return streams.store(LatestDoorBadgeIn.STORE);
    }
  • api – REST API endpoints for retrieving data from the projection stores
    • SensorReadingsApi.java
        @Path("/sensorreadings")
      @Produces(MediaType.APPLICATION_JSON)
      public class SensorReadingsApi {
       
          @GET()
          @Path("/{sensorid}")
          public SensorReading getSensorReading(@PathParam("sensorid") String sensorId) {
              SensorReading latest = KafkaProjectionsService.getLatestSensorReadingsStore().get(sensorId);
              if (latest == null) {
                  throw new NotFoundException();
              }
              return latest;
          }
      }
    • DoorBadgeInsApi.java
        @Path("/badgeins")
      @Produces(MediaType.APPLICATION_JSON)
      public class DoorBadgeInsApi {
       
          @GET()
          @Path("/{doorid}")
          public DoorBadgeIn getDoorBadgeIn(@PathParam("doorid") String doorid) {
              DoorBadgeIn latest = KafkaProjectionsService.getLatestDoorBadgeInStore().get(doorid);
              if (latest == null) {
                  throw new NotFoundException();
              }
              return latest;
          }
      }

Running the demo

03-kafka-streams/README.md contains instructions for how to build and run this demo for yourself. It includes instructions for running it locally on your own computer (pointing at a local database) and deploying to an OpenShift cluster.

When is a Kafka Streams projection a good choice?

It can be complex to support queries that would be comparatively trivial to implement using a database. The examples I’ve been using for these projection demos (maintaining the most recent data with a given key or contents) are great examples of how Kafka Streams is a good choice where the projection store and query requirements are relatively simple.

The store created in the example above is inflexible, as it is difficult to change what you can query from the store without rebuilding the Kafka Streams app (and the store it maintains). This shows how Kafka Streams is a better choice where the projection requirements are unlikely to change after deployment.

Using Kafka topics to provide persistent storage makes this a self-contained approach, and a good choice where there are limitations on introducing additional infrastructure or components. It gives the benefits of safe persistent storage without the overhead of operating a database. (Although I have shown this as a stand-alone deployment accessed via HTTP/REST, this logic could be embedded within any Java application, meaning that there would be no additional runtime components at all.)

As the new Kafka Streams topics provide persistent storage for the projection, this approach is not dependent on the source Kafka topics for long-term storage. This makes it a reliable choice where the source Kafka topics have very short retention limits. Even if the Kafka topic retention isn’t necessarily short, this also helps where critical data is produced infrequently. For example, imagine if sensors only produced temperature events when the temperature was abnormal. This could mean that some sensors emitted events only once a year – perhaps to a topic with a 6-month retention. In general, where it is likely that important historical data has long-since been aged-out of the source Kafka topics, the Kafka Streams topics allow for safe long-term storage in a projection.

When is a Kafka Streams projection NOT a good choice?

Although I’ve been using Java for all of the Event Projection demos, the in-memory and database approaches are language-agnostic and could have been written in any programming language. Kafka Streams is a Java library only, so this may not be a good choice where teams don’t have experience in maintaining Java applications.

More generally, even if Java is not an obstacle, not all Java developers will be comfortable with the Kafka Streams framework (especially compared with a hashmap which any Java developer will know how to create, or a database which most developers will likely be familiar with). Although I hope the example above has shown that it is simple to implement, this may still not be the best choice where teams don’t have experience with Kafka Streams.

That said, to declare a little bias, this approach is probably my favourite. Kafka Streams provides a persistent store for use cases where there are only simple retrieval requirements, and does this without needing to introduce any additional components, or overly complex code.

Tags: ,

Leave a Reply