In this post, I’ll walk through a sample implementation of the simplest way to maintain an Event Projection: an in-memory lookup table. I’ll use this sample to illustrate when this is a suitable approach to use.
The objective for this demo
In Comparing approaches to maintaining an Event Projection from Kafka topics, I introduced the pattern of Event Projections.
I also introduced the scenario that I’ll be using in these demos. Please see that post for the detail and motivation, but to recap: I will maintain a projection of the data from two Kafka topics (one based on the event key, the other based on an attribute in the event payload).
In both cases, I want to be able to make an HTTP/REST call to retrieve the data that was in the most recent event to match my query.
At a high-level, the goal was to create a single server that will:
- subscribe to my Kafka topics
- maintain an in-memory lookup of the relevant data
- provide an HTTP/REST API for querying the projection
For demo purposes, my “application” will be curl
, so I can illustrate being able to query the projection like this.
After an event like this arrives on the SENSOR.READINGS Kafka topic:
I can query for the latest temperature and humidity for that location:
curl --silent \ http://projectionsdemo.apps.dale-lane.demo.ibm.com/sensorreadings/G-0-15 | jq { "humidity": 43, "sensorId": "G-0-15", "sensorTime": "Thu Nov 28 13:47:09 GMT 2024", "temperature": 22.3 }
And after an event like this arrives on the DOOR.BADGEIN Kafka topic:
I can query for the latest employee to have gone through that door.
curl --silent \ http://projectionsdemo.apps.dale-lane.demo.ibm.com/badgeins/DE-1-16 | jq { "badgeTime": "2024-11-28 13:54:48.702", "doorId": "DE-1-16", "employee": "geralyn.littel", "recordId": "0fc23607-4527-4b6a-bb19-9e6aac5c22ba" }
Demo implementation
You can find the source code for a demo implementation in Github at dalelane/event-projections-demos:
In this post, I’ll be going through the 01-in-memory
folder, which contains the first of my demos. It includes a complete and deployable demo, pre-configured to run in OpenShift and to work with the “Loosehanger Jeans” demo scenario environment I described in the previous post.
I used OpenLiberty as the basis for the demo server. This seemed like a reasonable compromise by being simple enough for a demo, while being realistic enough to give a feel for what it would look like to do this in more than just a “Hello World” way.
Code structure
This first approach is by far the simplest of the ones that I will share.
As most of the source code for this demo is boilerplate that will be identical across the different event projection approaches, this gives me an opportunity to first do a quick walk-through of that.
If you’re only interested in the Event Projection-specifics, you can skip this first bit!
Next I’ll show how the demo maintains the projection. And finally, I’ll show how the demo accesses the projection.
Code structure: common across Event Projection demos
Most of it is in Java:
api
– provides the HTTP/REST API for the application to queryexceptions
– error handling – such as if the client application requests something that isn’t in the projectionhealth
– Kubernetes probe endpoints – as I wrote this to run in OpenShift
data
– data definitionsloosehanger
– definitions of the data that the service will receive from Kafka topicsserdes
– deserializers for parsing the data received from Kafka
utils
– helper classKafkaConfig.java
– gets the connection details for Kafka (from a properties file when running locally, or from a Secret when running in OpenShift)
There are also some common config files:
src/main/liberty/config/server.xml
– identifies which OpenLiberty features I’m usingsrc/main/resources/META-INF/microprofile-config.properties
– connection details for Kafka when running locally (overridden by environment variables when running in OpenShift)pom.xml
– dependencies
Code structure: maintaining an in-memory lookup
memory
– the in-memory specific logicLatestSensorReading.java
– subscribe to the SENSOR.READINGS topic and store events in a HashMap indexed by key/** In-memory map of the latest sensor reading, keyed by sensor id */ private Map<String, SensorReading> store = new HashMap<>(); ... private void run() { try (KafkaConsumer<String, SensorReading> consumer = prepareKafkaConsumer()) { isRunning = true; while (isRunning) { ConsumerRecords<String, SensorReading> records = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<String, SensorReading> record : records) { store.put(record.key(), record.value()); } } }
LatestDoorBadgeIn.java
– subscribe to the DOOR.BADGEIN topic and store events in a HashMap indexed by thedoor
ID property/** In-memory map of the latest badge events, keyed by door id */ private Map<String, DoorBadgeIn> store = new HashMap<>(); ... private void run() { try (KafkaConsumer<byte[], DoorBadgeIn> consumer = prepareKafkaConsumer()) { isRunning = true; while (isRunning) { ConsumerRecords<byte[], DoorBadgeIn> records = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<byte[], DoorBadgeIn> record : records) { DoorBadgeIn badgeEvent = record.value(); store.put(badgeEvent.getDoorId(), badgeEvent); } }
Code structure: accessing an in-memory lookup
api
– provides the HTTP/REST API for the application to querySensorReadingsApi.java
– retrieves the latest sensor reading for the requested sensor@GET() @Path("/{sensorid}") public SensorReading getSensorReading(@PathParam("sensorid") String sensorId) { SensorReading latest = latestSensorReading.getStore().get(sensorId); if (latest == null) { throw new NotFoundException(); } return latest; }
DoorBadgeInsApi.java
– retrieves the latest badge event for the requested door@GET() @Path("/{doorid}") public DoorBadgeIn getDoorBadgeIn(@PathParam("doorid") String doorid) { DoorBadgeIn latest = latestDoorBadgeIn.getStore().get(doorid); if (latest == null) { throw new NotFoundException(); } return latest; }
Running the demo
01-in-memory/README.md
contains instructions for how to build and run this demo for yourself.
It includes instructions for running it locally on your own computer (pointing at a local Kafka running on localhost:9092
) and deploying to an OpenShift cluster.
When is an in-memory projection a good choice?
As I mentioned above, this is a trivially simple approach. Simplicity has value: this approach is a good choice for quick demos and prototypes, or where rapid creation and deployment is a priority.
This is a small and self-contained approach, so it is a good choice where there are limitations on introducing additional infrastructure or components. Although I have shown this as a stand-alone component accessed via HTTP/REST, this logic could easily be embedded within an application, meaning that there could be no additional runtime components at all.
Keeping the projection in memory is better suited to situations where the projection is small and well-bounded. The door access events demo described above is a good example of this – the projection needs to store the most recent employee to access each door. There is a fixed number of doors in Loosehanger Jeans, so it doesn’t matter how long the projection is maintained for, or how many events are emitted. The size of the projection will be consistent, as it is relative to the number of doors that Loosehanger have – which will not keep increasing!
The lack of persistent storage means that the projection is dependent on any important data being available on the Kafka topic. This makes it a good choice where data needed in the projection will be frequently produced. The temperature sensor readings demo described above is a good example of this – if every individual sensor is emitting events every few minutes (and the projection doesn’t need to hold the history), the lack of persistence is less critical. In our fictional scenario, Loosehanger Jeans can assume that within a few minutes of starting up, the projection will always be fully populated with the latest reading for every sensor.
When is an in-memory projection NOT a good choice?
The lack of persistent storage is a problem where critical data is produced infrequently. For example, imagine if sensors only produced temperature events when the temperature was abnormal. This could mean that some sensors emitted events only once a year – perhaps to a topic with a 6-month retention. In such a scenario, an in-memory projection wouldn’t be viable – a lot of important data would not be recoverable on restart.
Even if the important data is likely to be available on the Kafka topic, the need to rebuild the projection from scratch every time there is a restart could be an issue. You can see this in the demo implementation as it re-reads all events from the start of the topic on startup. This means this may not be a good choice where there are Kafka topics with very large retained histories, or where rapid startup time is critical.
In the next post…
In my next post, I’ll show how this demo can be extended by using a database to store the projection – and how this can address many of the limitations described above.
Tags: apachekafka, kafka