{"id":5437,"date":"2024-12-02T00:05:32","date_gmt":"2024-12-02T00:05:32","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5437"},"modified":"2024-12-02T00:05:32","modified_gmt":"2024-12-02T00:05:32","slug":"using-kafka-streams-for-a-kafka-event-projection","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5437","title":{"rendered":"Using Kafka Streams for a Kafka Event Projection"},"content":{"rendered":"<p><strong>In this post, I&#8217;ll walk through a sample implementation of Kafka Streams to maintain an Event Projection. I&#8217;ll use this to illustrate when this is a suitable approach to use.<\/strong><\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/images\/projection-kafkastreams.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p><em>I&#8217;ve written similar Event Projection posts about sample implementations that use an <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5409\">in-memory lookup table<\/a>, and a <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5422\">PostgreSQL database<\/a>.<\/em><\/p>\n<h3>The objective for this demo<\/h3>\n<p>I introduced the pattern of Event Projections in <strong><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">Comparing approaches to maintaining an Event Projection from Kafka topics<\/a><\/strong>.<\/p>\n<p>I also explained the scenario that I&#8217;ve been using for each of my Event Projections demos. If you haven&#8217;t seen my other posts, it may help to <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">go back and see the scenario detail and motivation<\/a> first.<\/p>\n<p>In short, I&#8217;m showing how to maintain a projection of two Kafka topics (one based on the event key, the other based on an attribute in the event payload). And I&#8217;m showing how an application could make an HTTP\/REST call to retrieve the data from the most recent event that matches some query.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/images\/projection-kafkastreams-detail.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>At a high-level, the goal for this demo is to:<\/p>\n<ul>\n<li>use Kafka Streams to maintain a projection of the Kafka topics<\/li>\n<li>provide an HTTP\/REST API for querying the projection<\/li>\n<\/ul>\n<p><!--more-->For demo purposes, my &#8220;application&#8221; will be <code style=\"color: #770000; font-weight: bold;\">curl<\/code>, so I can illustrate being able to query the projection like this.<\/p>\n<p>After an event like this arrives on the SENSOR.READINGS Kafka topic:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/sensor-readings-topic.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I will <strong>query for the latest temperature and humidity<\/strong> for that location:<\/p>\n<pre style=\"color: white; background-color: black; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\ncurl --silent \\\n  http:\/\/projectionsdemo.apps.dale-lane.demo.ibm.com\/sensorreadings\/G-0-15 | jq\n\n{\n    \"humidity\": 43,\n    \"sensorId\": \"G-0-15\",\n    \"sensorTime\": \"Thu Nov 28 13:47:09 GMT 2024\",\n    \"temperature\": 22.3\n}\n<\/pre>\n<p>And after an event like this arrives on the DOOR.BADGEIN Kafka topic:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/door-badgein-topic.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I will <strong>query for the latest employee<\/strong> to have gone through that door.<\/p>\n<pre style=\"color: white; background-color: black; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\ncurl --silent \\\n  http:\/\/projectionsdemo.apps.dale-lane.demo.ibm.com\/badgeins\/DE-1-16 | jq\n\n{\n    \"badgeTime\": \"2024-11-28 13:54:48.702\",\n    \"doorId\": \"DE-1-16\",\n    \"employee\": \"geralyn.littel\",\n    \"recordId\": \"0fc23607-4527-4b6a-bb19-9e6aac5c22ba\"\n}\n<\/pre>\n<h3>Demo implementation<\/h3>\n<p>You can find the source code for a demo implementation in Github at <a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\">dalelane\/event-projections-demos<\/a>:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/github-kafkastreams.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>In this post, I&#8217;ll be going through the <code style=\"color: #770000; font-weight: bold;\">03-kafka-streams<\/code> folder, which contains the third of my demos. It includes a complete and deployable demo, pre-configured to run in OpenShift and work with the demo scenario environment I <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">described in my first post<\/a>.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/openshift-kafkastreams.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>As with my other Event Projections demos, I used <a href=\"https:\/\/openliberty.io\/\">OpenLiberty<\/a> as the basis for the demo server. This makes it simple to create, but also hints at what a more realistic deployment would look like.<\/p>\n<h3>Code structure<\/h3>\n<p>The code for this is a little more complex than the earlier event projection approaches (<a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5409\">in-memory<\/a> and <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5422\">database<\/a>), however I hope this sample will illustrate that it is still simple.<\/p>\n<p>Most of the source code for this demo is identical to the earlier approaches, but I&#8217;ll quickly remind you of what is in here. Next I&#8217;ll show how the demo maintains the projection. And finally, I&#8217;ll show how the demo accesses the projection.<\/p>\n<h3>Code structure: common across Event Projection demos<\/h3>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\">api<\/a><\/code> \u2013 provides the HTTP\/REST API for the application to query\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/exceptions\">exceptions<\/a><\/code> \u2013 error handling \u2013 such as if the client application requests something that isn\u2019t in the projection<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/health\">health<\/a><\/code> \u2013 Kubernetes probe endpoints \u2013 as I wrote this to run in OpenShift<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\">data<\/a><\/code> \u2013 data definitions\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/loosehanger\">loosehanger<\/a><\/code> \u2013 definitions of the data<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/serdes\">serdes<\/a><\/code> \u2013 serializers \/ deserializers for parsing the data<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/utils\">utils<\/a><\/code> \u2013 helper class\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/utils\/KafkaConfig.java\">KafkaConfig.java<\/a><\/code> \u2013 connection details for Kafka (uses a properties file when running locally, or from a Secret when running in OpenShift)<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p>There are also some common config files:<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/liberty\/config\/server.xml\">liberty\/config\/server.xml<\/a><\/code> \u2013 identifies the OpenLiberty features I\u2019m using<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/resources\/META-INF\/microprofile-config.properties\">resources\/META-INF\/microprofile-config.properties<\/a><\/code> \u2013 holds the connection details for Kafka when running locally (overridden by environment variables when running in OpenShift)<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/pom.xml\">pom.xml<\/a><\/code> \u2013 dependencies<\/li>\n<\/ul>\n<h3>Code structure: maintaining a projection<\/h3>\n<p>I&#8217;m using Kafka Streams to group the events by the value I&#8217;ll be using to look them up and retrieve them. <code style=\"color: #770000; font-weight: bold;\">groupBy<\/code><\/p>\n<p>To keep only the latest event for each key, I&#8217;ve written a simple <a href=\"https:\/\/kafka.apache.org\/38\/javadoc\/org\/apache\/kafka\/streams\/kstream\/Reducer.html\">Reducer<\/a> that returns the newer event. <code style=\"color: #770000; font-weight: bold;\">(olderEvent, laterEvent) -> laterEvent<\/code><\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kakfa-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/streams\">streams<\/a><\/code> \u2013 the Kafka Streams specific logic\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/streams\/LatestSensorReading.java\">LatestSensorReading.java<\/a><\/code> \u2013 subscribe to the SENSOR.READINGS topic and store events in a key-value store indexed by key\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  \/** Key\/Value store of sensor readings, keyed by the sensor id. *\/\npublic static final StoreQueryParameters&lt;ReadOnlyKeyValueStore&lt;String, SensorReading&gt;&gt; STORE =\n&nbsp; &nbsp; StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());\n&nbsp;\npublic static void create(final StreamsBuilder builder) {\n&nbsp; &nbsp; log.info(\"Creating latest sensor readings stream\");\n&nbsp;\n&nbsp; &nbsp; builder\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/ consume the input topic with sensor reading events\n&nbsp; &nbsp; &nbsp; &nbsp; .stream(INPUT_TOPIC, Consumed.with(ProjectionsSerdes.SENSOR_ID_SERDES,\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ProjectionsSerdes.SENSOR_READING_SERDES))\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/ group the events by the ID of the sensor that recorded the reading\n&nbsp; &nbsp; &nbsp; &nbsp; .groupByKey()\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/ keep the latest event from each sensor\n&nbsp; &nbsp; &nbsp; &nbsp; .reduce((olderEvent, laterEvent) -&gt; laterEvent,\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Named.as(\"keep_latest_sensor_readings\"),\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Materialized.&lt;String, SensorReading,\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; KeyValueStore&lt;Bytes, byte[]&gt;&gt; as (STORE_NAME));\n}<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/streams\/LatestDoorBadgeIn.java\">LatestDoorBadgeIn.java<\/a><\/code> \u2013 subscribe to the DOOR.BADGEIN topic and store events in a key-value store indexed by the <code style=\"color: #770000; font-weight: bold;\">door<\/code> ID property\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  \/** Key\/Value store of door-badge events, keyed by the door id. *\/\npublic static final StoreQueryParameters&lt;ReadOnlyKeyValueStore&lt;String, DoorBadgeIn&gt;&gt; STORE =\n&nbsp; &nbsp; StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());\n&nbsp;\npublic static void create(final StreamsBuilder builder) {\n&nbsp; &nbsp; log.info(\"Creating latest door badge events stream\");\n&nbsp;\n&nbsp; &nbsp; builder\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/ consume the input topic with door badge events\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/   note that the existing key (unique record ids) are not helpful to us\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/&nbsp; &nbsp; so we will be ignoring it\n&nbsp; &nbsp; &nbsp; &nbsp; .stream(INPUT_TOPIC, Consumed.with(ProjectionsSerdes.IGNORE,\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ProjectionsSerdes.DOOR_BADGEIN_SERDES))\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/ re-key the events, so that they are grouped by the ID of the door\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/   where the event was recorded\n&nbsp; &nbsp; &nbsp; &nbsp; .groupBy((recordId, doorBadgeIn) -&gt; doorBadgeIn.getDoorId())\n&nbsp; &nbsp; &nbsp; &nbsp; \/\/ keep the latest event from each door\n&nbsp; &nbsp; &nbsp; &nbsp; .reduce((olderEvent, laterEvent) -&gt; laterEvent,\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Named.as(\"keep_latest_door_badgeins\"),\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Materialized.&lt;String, DoorBadgeIn,\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; KeyValueStore&lt;Bytes, byte[]&gt;&gt; as (STORE_NAME));\n}<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/KafkaProjectionsService.java\">KafkaProjectionsService.java<\/a><\/code> &#8211; starts both of those streams\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  log.info(\"Getting connection details for Kafka\");\nfinal Properties props = KafkaConfig.getKafkaClientConfiguration();\n&nbsp;\nlog.info(\"Starting Kafka Streams jobs\");\nfinal StreamsBuilder builder = new StreamsBuilder();\nLatestSensorReading.create(builder);\nLatestDoorBadgeIn.create(builder);\n&nbsp;\nlog.info(\"Preparing Streams application\");\nfinal Topology topology = builder.build();\nlog.info(topology.describe().toString());\n&nbsp;\nlog.info(\"Starting Streams application\");\nstreams = new KafkaStreams(topology, props);\nstreams.start();<\/pre>\n<\/li>\n<\/ul>\n<p>Notice that when you start the application, Kafka Streams will create additional topics in your Kafka cluster. The two &#8220;changelog&#8221; topics provide the persistent storage for the projection, while the &#8220;repartition&#8221; topic is used by the LatestDoorBadgeIn job to re-group the events by the door ID.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/topics-kafkastreams.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>Note that Kafka Streams applications do also use filesystem storage to support the RocksDB database that holds the current state.<\/p>\n<p>For this demo, you can see that <a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/k8s.yaml#L20-L21\">I&#8217;m using ephemeral storage for this<\/a> when running in OpenShift. This means that Streams will need to rebuild some state from the Kafka topics on startup. A persistent volume would save this startup time. For more complex Streams applications I would consider this a necessity, but for a job this simple I think I could tolerate the startup cost.<\/p>\n<h3>Code structure: accessing a Kafka Streams projection<\/h3>\n<p>The key-value store created by the Streams can be queried, so this meant I could easily access values from the projection.<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/KafkaProjectionsService.java\">KafkaProjectionsService.java<\/a><\/code> &#8211; provides access to the stores\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">  public static ReadOnlyKeyValueStore&lt;String, SensorReading&gt; getLatestSensorReadingsStore() {\n&nbsp; &nbsp; return streams.store(LatestSensorReading.STORE);\n}\npublic static ReadOnlyKeyValueStore&lt;String, DoorBadgeIn&gt; getLatestDoorBadgeInStore() {\n&nbsp; &nbsp; return streams.store(LatestDoorBadgeIn.STORE);\n}<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\">api<\/a><\/code> &#8211; REST API endpoints for retrieving data from the projection stores\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/SensorReadingsApi.java\">SensorReadingsApi.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @Path(\"\/sensorreadings\")\n@Produces(MediaType.APPLICATION_JSON)\npublic class SensorReadingsApi {\n&nbsp;\n&nbsp; &nbsp; @GET()\n&nbsp; &nbsp; @Path(\"\/{sensorid}\")\n&nbsp; &nbsp; public SensorReading getSensorReading(@PathParam(\"sensorid\") String sensorId) {\n&nbsp; &nbsp; &nbsp; &nbsp; SensorReading latest = KafkaProjectionsService.getLatestSensorReadingsStore().get(sensorId);\n&nbsp; &nbsp; &nbsp; &nbsp; if (latest == null) {\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NotFoundException();\n&nbsp; &nbsp; &nbsp; &nbsp; }\n&nbsp; &nbsp; &nbsp; &nbsp; return latest;\n&nbsp; &nbsp; }\n}<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/DoorBadgeInsApi.java\">DoorBadgeInsApi.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @Path(\"\/badgeins\")\n@Produces(MediaType.APPLICATION_JSON)\npublic class DoorBadgeInsApi {\n&nbsp;\n&nbsp; &nbsp; @GET()\n&nbsp; &nbsp; @Path(\"\/{doorid}\")\n&nbsp; &nbsp; public DoorBadgeIn getDoorBadgeIn(@PathParam(\"doorid\") String doorid) {\n&nbsp; &nbsp; &nbsp; &nbsp; DoorBadgeIn latest = KafkaProjectionsService.getLatestDoorBadgeInStore().get(doorid);\n&nbsp; &nbsp; &nbsp; &nbsp; if (latest == null) {\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NotFoundException();\n&nbsp; &nbsp; &nbsp; &nbsp; }\n&nbsp; &nbsp; &nbsp; &nbsp; return latest;\n&nbsp; &nbsp; }\n}<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<h3>Running the demo<\/h3>\n<p><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/README.md\"><code style=\"color: #770000; font-weight: bold;\">03-kafka-streams\/README.md<\/code><\/a> contains instructions for how to build and run this demo for yourself. It includes instructions for running it locally on your own computer (pointing at a local database) and deploying to an OpenShift cluster.<\/p>\n<h3>When is a Kafka Streams projection a good choice?<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/03-kafka-streams\/images\/projection-kafkastreams-detail.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>It can be complex to support queries that would be comparatively trivial to implement using a database. The examples I&#8217;ve been using for these projection demos (maintaining the most recent data with a given key or contents) are great examples of how Kafka Streams is a good choice <strong>where the projection store and query requirements are relatively simple<\/strong>.<\/p>\n<p>The store created in the example above is inflexible, as it is difficult to change what you can query from the store without rebuilding the Kafka Streams app (and the store it maintains). This shows how Kafka Streams is a better choice <strong>where the projection requirements are unlikely to change after deployment<\/strong>.<\/p>\n<p>Using Kafka topics to provide persistent storage makes this a self-contained approach, and a good choice <strong>where there are limitations on introducing additional infrastructure<\/strong> or components. It gives the benefits of safe persistent storage without the overhead of <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5422\">operating a database<\/a>. (Although I have shown this as a stand-alone deployment accessed via HTTP\/REST, this logic could be embedded within any Java application, meaning that there would be no additional runtime components at all.)<\/p>\n<p>As the new Kafka Streams topics provide persistent storage for the projection, this approach is not dependent on the source Kafka topics for long-term storage. This makes it a reliable choice <strong>where the source Kafka topics have very short retention limits<\/strong>. Even if the Kafka topic retention isn\u2019t necessarily short, this also helps <strong>where critical data is produced infrequently<\/strong>. For example, imagine if sensors only produced temperature events when the temperature was abnormal. This could mean that some sensors emitted events only once a year \u2013 perhaps to a topic with a 6-month retention. In general, where it is likely that important historical data has long-since been aged-out of the source Kafka topics, the Kafka Streams topics allow for safe long-term storage in a projection.<\/p>\n<h3>When is a Kafka Streams projection NOT a good choice?<\/h3>\n<p>Although I&#8217;ve been using Java for all of the Event Projection demos, the <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5409\">in-memory<\/a> and <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5422\">database<\/a> approaches are language-agnostic and could have been written in any programming language. Kafka Streams is a Java library only, so this may not be a good choice <strong>where teams don&#8217;t have experience in maintaining Java applications<\/strong>.<\/p>\n<p>More generally, even if Java is not an obstacle, not all Java developers will be comfortable with the Kafka Streams framework (especially compared with a hashmap which any Java developer will know how to create, or a database which most developers will likely be familiar with). Although I hope the example above has shown that it is simple to implement, this may still not be the best choice <strong>where teams don&#8217;t have experience with Kafka Streams<\/strong>.<\/p>\n<p>That said, to declare a little bias, this approach is probably my favourite. Kafka Streams provides a persistent store for use cases where there are only simple retrieval requirements, and does this without needing to introduce any additional components, or overly complex code.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Sample implementation of maintaining an Event Projection of an Apache Kafka topic using Kafka Streams.<\/p>\n","protected":false},"author":1,"featured_media":5438,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,584],"class_list":["post-5437","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5437","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5437"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5437\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5438"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5437"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5437"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5437"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}