{"id":5409,"date":"2024-11-28T18:22:51","date_gmt":"2024-11-28T18:22:51","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5409"},"modified":"2024-12-01T10:03:11","modified_gmt":"2024-12-01T10:03:11","slug":"using-an-in-memory-lookup-table-for-a-kafka-event-projection","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5409","title":{"rendered":"Using an in-memory lookup table for a Kafka Event Projection"},"content":{"rendered":"<p><strong>In this post, I&#8217;ll walk through a sample implementation of the simplest way to maintain an Event Projection: an in-memory lookup table. I&#8217;ll use this sample to illustrate when this is a suitable approach to use.<\/strong><\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/images\/projection-inmemory.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<h3>The objective for this demo<\/h3>\n<p>In <strong><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">Comparing approaches to maintaining an Event Projection from Kafka topics<\/a><\/strong>, I introduced the pattern of <strong>Event Projections<\/strong>.<\/p>\n<p>I also introduced the scenario that I&#8217;ll be using in these demos. Please see that post for the detail and motivation, but to recap: I will maintain a projection of the data from two Kafka topics (one based on the event key, the other based on an attribute in the event payload).<\/p>\n<p>In both cases, I want to be able to make an HTTP\/REST call to retrieve the data that was in the most recent event to match my query.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/images\/projection-inmemory-detail.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>At a high-level, the goal was to create a single server that will:<\/p>\n<ul>\n<li>subscribe to my Kafka topics<\/li>\n<li>maintain an in-memory lookup of the relevant data<\/li>\n<li>provide an HTTP\/REST API for querying the projection<\/li>\n<\/ul>\n<p>For demo purposes, my &#8220;application&#8221; will be <code style=\"color: #770000; font-weight: bold;\">curl<\/code>, so I can illustrate being able to query the projection like this.<\/p>\n<p><!--more-->After an event like this arrives on the SENSOR.READINGS Kafka topic:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/sensor-readings-topic.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I can <strong>query for the latest temperature and humidity<\/strong> for that location:<\/p>\n<pre style=\"color: white; background-color: black; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\ncurl --silent \\\n  http:\/\/projectionsdemo.apps.dale-lane.demo.ibm.com\/sensorreadings\/G-0-15 | jq\n\n{\n    \"humidity\": 43,\n    \"sensorId\": \"G-0-15\",\n    \"sensorTime\": \"Thu Nov 28 13:47:09 GMT 2024\",\n    \"temperature\": 22.3\n}\n<\/pre>\n<p>And after an event like this arrives on the DOOR.BADGEIN Kafka topic:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/door-badgein-topic.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I can <strong>query for the latest employee<\/strong> to have gone through that door.<\/p>\n<pre style=\"color: white; background-color: black; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\ncurl --silent \\\n  http:\/\/projectionsdemo.apps.dale-lane.demo.ibm.com\/badgeins\/DE-1-16 | jq\n\n{\n    \"badgeTime\": \"2024-11-28 13:54:48.702\",\n    \"doorId\": \"DE-1-16\",\n    \"employee\": \"geralyn.littel\",\n    \"recordId\": \"0fc23607-4527-4b6a-bb19-9e6aac5c22ba\"\n}\n<\/pre>\n<h3>Demo implementation<\/h3>\n<p>You can find the source code for a demo implementation in Github at <a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\">dalelane\/event-projections-demos<\/a>:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/github-inmemory.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>In this post, I&#8217;ll be going through the <code style=\"color: #770000; font-weight: bold;\">01-in-memory<\/code> folder, which contains the first of my demos. It includes a complete and deployable demo, pre-configured to run in OpenShift and to work with the &#8220;Loosehanger Jeans&#8221; demo scenario environment I <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">described in the previous post<\/a>.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/openshift-inmemory.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I used <a href=\"https:\/\/openliberty.io\/\">OpenLiberty<\/a> as the basis for the demo server. This seemed like a reasonable compromise by being simple enough for a demo, while being realistic enough to give a feel for what it would look like to do this in more than just a &#8220;Hello World&#8221; way.<\/p>\n<h3>Code structure<\/h3>\n<p>This first approach is by far the simplest of the ones that I will share.<\/p>\n<p>As most of the source code for this demo is boilerplate that will be identical across the different event projection approaches, this gives me an opportunity to first do a quick walk-through of that.<\/p>\n<p><em>If you&#8217;re only interested in the Event Projection-specifics, you can skip this first bit!<\/em><\/p>\n<p>Next I&#8217;ll show how the demo maintains the projection. And finally, I&#8217;ll show how the demo accesses the projection.<\/p>\n<h3>Code structure: common across Event Projection demos<\/h3>\n<p>Most of it is in Java:<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\">api<\/a><\/code> &#8211; provides the HTTP\/REST API for the application to query\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/exceptions\">exceptions<\/a><\/code> &#8211; error handling &#8211; such as if the client application requests something that isn&#8217;t in the projection<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/health\">health<\/a><\/code> &#8211; Kubernetes probe endpoints &#8211; as I wrote this to run in OpenShift<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\">data<\/a><\/code> &#8211; data definitions\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/loosehanger\">loosehanger<\/a><\/code> &#8211; definitions of the data that the service will receive from Kafka topics<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/serdes\">serdes<\/a><\/code> &#8211; deserializers for parsing the data received from Kafka<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/utils\">utils<\/a><\/code> &#8211; helper class\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/utils\/KafkaConfig.java\">KafkaConfig.java<\/a><\/code> &#8211; gets the connection details for Kafka (from a properties file when running locally, or from a Secret when running in OpenShift)<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p>There are also some common config files:<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/liberty\/config\/server.xml\">src\/main\/liberty\/config\/server.xml<\/a><\/code> &#8211; identifies which OpenLiberty features I&#8217;m using<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/resources\/META-INF\/microprofile-config.properties\">src\/main\/resources\/META-INF\/microprofile-config.properties<\/a><\/code> &#8211; connection details for Kafka when running locally (overridden by environment variables when running in OpenShift)<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/pom.xml\">pom.xml<\/a><\/code> &#8211; dependencies<\/li>\n<\/ul>\n<h3>Code structure: maintaining an in-memory lookup<\/h3>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/memory\">memory<\/a><\/code> &#8211; the in-memory specific logic\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/memory\/LatestSensorReading.java\">LatestSensorReading.java<\/a><\/code> &#8211; subscribe to the SENSOR.READINGS topic and store events in a HashMap indexed by key\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  \/** In-memory map of the latest sensor reading, keyed by sensor id *\/\nprivate Map&lt;String, SensorReading> store = new HashMap<>();\n&nbsp;\n...\n&nbsp;\nprivate void run() {\n    try (KafkaConsumer&lt;String, SensorReading&gt; consumer = prepareKafkaConsumer()) {\n        isRunning = true;\n        while (isRunning) {\n            ConsumerRecords&lt;String, SensorReading&gt; records = consumer.poll(Duration.ofSeconds(10));\n            for (ConsumerRecord&lt;String, SensorReading&gt; record : records) {\n                store.put(record.key(), record.value());\n            }\n        }\n    }\n<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/memory\/LatestDoorBadgeIn.java\">LatestDoorBadgeIn.java<\/a><\/code> &#8211; subscribe to the DOOR.BADGEIN topic and store events in a HashMap indexed by the <code style=\"color: #770000; font-weight: bold;\">door<\/code> ID property\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  \/** In-memory map of the latest badge events, keyed by door id *\/\nprivate Map&lt;String, DoorBadgeIn> store = new HashMap<>();\n&nbsp;\n...\n&nbsp;\nprivate void run() {\n    try (KafkaConsumer&lt;byte[], DoorBadgeIn&gt; consumer = prepareKafkaConsumer()) {\n        isRunning = true;\n        while (isRunning) {\n            ConsumerRecords&lt;byte[], DoorBadgeIn&gt; records = consumer.poll(Duration.ofSeconds(10));\n            for (ConsumerRecord&lt;byte[], DoorBadgeIn&gt; record : records) {\n                DoorBadgeIn badgeEvent = record.value();\n                store.put(badgeEvent.getDoorId(), badgeEvent);\n            }\n        }\n<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<h3>Code structure: accessing an in-memory lookup<\/h3>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\">api<\/a><\/code> &#8211; provides the HTTP\/REST API for the application to query\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/SensorReadingsApi.java\">SensorReadingsApi.java<\/a><\/code> &#8211; retrieves the latest sensor reading for the requested sensor\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  @GET()\n@Path(\"\/{sensorid}\")\npublic SensorReading getSensorReading(@PathParam(\"sensorid\") String sensorId) {\n    SensorReading latest = latestSensorReading.getStore().get(sensorId);\n    if (latest == null) {\n        throw new NotFoundException();\n    }\n    return latest;\n}<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/DoorBadgeInsApi.java\">DoorBadgeInsApi.java<\/a><\/code> &#8211; retrieves the latest badge event for the requested door\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  @GET()\n@Path(\"\/{doorid}\")\npublic DoorBadgeIn getDoorBadgeIn(@PathParam(\"doorid\") String doorid) {\n    DoorBadgeIn latest = latestDoorBadgeIn.getStore().get(doorid);\n    if (latest == null) {\n        throw new NotFoundException();\n    }\n    return latest;\n}<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<h3>Running the demo<\/h3>\n<p><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/README.md\"><code style=\"color: #770000; font-weight: bold;\">01-in-memory\/README.md<\/code><\/a> contains instructions for how to build and run this demo for yourself. <\/p>\n<p>It includes instructions for running it locally on your own computer (pointing at a local Kafka running on <code>localhost:9092<\/code>) and deploying to an OpenShift cluster.<\/p>\n<h3>When is an in-memory projection a good choice?<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/images\/projection-inmemory-detail.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>As I mentioned above, this is a trivially simple approach. Simplicity has value: this approach is a good choice for quick demos and prototypes, or <strong>where rapid creation and deployment is a priority<\/strong>.<\/p>\n<p>This is a small and self-contained approach, so it is a good choice <strong>where there are limitations on introducing additional infrastructure<\/strong> or components. Although I have shown this as a stand-alone component accessed via HTTP\/REST, this logic could easily be embedded within an application, meaning that there could be no additional runtime components at all.<\/p>\n<p>Keeping the projection in memory is better suited to situations <strong>where the projection is small and well-bounded<\/strong>. The door access events demo described above is a good example of this &#8211; the projection needs to store the most recent employee to access each door. There is a fixed number of doors in Loosehanger Jeans, so it doesn&#8217;t matter how long the projection is maintained for, or how many events are emitted. The size of the projection will be consistent, as it is relative to the number of doors that Loosehanger have &#8211; which will not keep increasing!<\/p>\n<p>The lack of persistent storage means that the projection is dependent on any important data being available on the Kafka topic. This makes it a good choice <strong>where data needed in the projection will be frequently produced<\/strong>. The temperature sensor readings demo described above is a good example of this &#8211; if every individual sensor is emitting events every few minutes (and the projection doesn&#8217;t need to hold the history), the lack of persistence is less critical. In our fictional scenario, Loosehanger Jeans can assume that within a few minutes of starting up, the projection will always be fully populated with the latest reading for every sensor.<\/p>\n<h3>When is an in-memory projection NOT a good choice?<\/h3>\n<p>The lack of persistent storage is a problem <strong>where critical data is produced infrequently<\/strong>. For example, imagine if sensors only produced temperature events when the temperature was abnormal. This could mean that some sensors emitted events only once a year &#8211; perhaps to a topic with a 6-month retention. In such a scenario, an in-memory projection wouldn&#8217;t be  viable &#8211; a lot of important data would not be recoverable on restart.<\/p>\n<p>Even if the important data is likely to be available on the Kafka topic, the need to rebuild the projection from scratch every time there is a restart could be an issue. You can see this in the demo implementation as it <a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/01-in-memory\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/memory\/LatestDoorBadgeIn.java#L129-L142\">re-reads all events from the start of the topic<\/a> on startup. This means this may not be a good choice <strong>where there are Kafka topics with very large retained histories<\/strong>, or where rapid startup time is critical.<\/p>\n<h3>In the next post&#8230;<\/h3>\n<p>In my next post, I&#8217;ll show how this demo can be extended by using a database to store the projection &#8211; and how this can address many of the limitations described above.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Sample implementation of maintaining an Event Projection of an Apache Kafka topic using an in-memory lookup table.<\/p>\n","protected":false},"author":1,"featured_media":5410,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,584],"class_list":["post-5409","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5409","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5409"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5409\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5410"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5409"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5409"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5409"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}