{"id":5422,"date":"2024-11-29T14:31:11","date_gmt":"2024-11-29T14:31:11","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5422"},"modified":"2024-11-29T14:31:12","modified_gmt":"2024-11-29T14:31:12","slug":"using-a-database-for-a-kafka-event-projection","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5422","title":{"rendered":"Using a database for a Kafka Event Projection"},"content":{"rendered":"<p><strong>In this post, I&#8217;ll walk through a sample implementation of using a database to maintain an Event Projection. I&#8217;ll use this to illustrate when this is a suitable approach to use.<\/strong><\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/images\/projection-database.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<h3>The objective for this demo<\/h3>\n<p>In <strong><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">Comparing approaches to maintaining an Event Projection from Kafka topics<\/a><\/strong>, I introduced the pattern of Event Projections.<\/p>\n<p>I also introduced the scenario that I&#8217;ll be using in these demos. Please see that post for the detail and motivation, but to recap, I want to maintain a projection of two Kafka topics (one based on the event key, the other based on an attribute in the event payload).<\/p>\n<p>In both cases, I want to be able to make an HTTP\/REST call to retrieve the data from the most recent event that matches my query.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/images\/projection-database-detail.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>At a high-level, the goal was to:<\/p>\n<ul>\n<li>use Kafka Connect JDBC sink connectors to maintain a database projection of the Kafka topics<\/li>\n<li>provide an HTTP\/REST API for querying the projection<\/li>\n<\/ul>\n<p><!--more-->For demo purposes, my &#8220;application&#8221; will be <code style=\"color: #770000; font-weight: bold;\">curl<\/code>, so I can illustrate being able to query the projection like this.<\/p>\n<p>After an event like this arrives on the SENSOR.READINGS Kafka topic:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/sensor-readings-topic.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I can <strong>query for the latest temperature and humidity<\/strong> for that location:<\/p>\n<pre style=\"color: white; background-color: black; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\ncurl --silent \\\n  http:\/\/projectionsdemo.apps.dale-lane.demo.ibm.com\/sensorreadings\/G-0-15 | jq\n\n{\n    \"humidity\": 43,\n    \"sensorId\": \"G-0-15\",\n    \"sensorTime\": \"Thu Nov 28 13:47:09 GMT 2024\",\n    \"temperature\": 22.3\n}\n<\/pre>\n<p>And after an event like this arrives on the DOOR.BADGEIN Kafka topic:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/door-badgein-topic.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>I can <strong>query for the latest employee<\/strong> to have gone through that door.<\/p>\n<pre style=\"color: white; background-color: black; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\ncurl --silent \\\n  http:\/\/projectionsdemo.apps.dale-lane.demo.ibm.com\/badgeins\/DE-1-16 | jq\n\n{\n    \"badgeTime\": \"2024-11-28 13:54:48.702\",\n    \"doorId\": \"DE-1-16\",\n    \"employee\": \"geralyn.littel\",\n    \"recordId\": \"0fc23607-4527-4b6a-bb19-9e6aac5c22ba\"\n}\n<\/pre>\n<h3>Demo implementation<\/h3>\n<p>You can find the source code for a demo implementation in Github at <a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\">dalelane\/event-projections-demos<\/a>:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/github-database.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>In this post, I&#8217;ll be going through the <code style=\"color: #770000; font-weight: bold;\">02-database<\/code> folder, which contains the second of my demos. It includes a complete and deployable demo, pre-configured to run in OpenShift and work with the demo scenario environment I <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5392\">described in my first post<\/a>.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/openshift-database.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>As with <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5409\">the in-memory demo<\/a>, I used the same <a href=\"https:\/\/openliberty.io\/\">OpenLiberty<\/a> configuration as the basis for the demo server. This made it quick and simple to create, but also is a more realistic approach than a &#8220;Hello World&#8221; toy demo.<\/p>\n<h3>Code structure<\/h3>\n<p>Most of the source code for this demo is boilerplate that will be identical across the different event projection approaches. I&#8217;ll quickly remind you of what is in here.<\/p>\n<p>Next I&#8217;ll show how the demo maintains the projection. And finally, I&#8217;ll show how the demo accesses the projection.<\/p>\n<h3>Code structure: common across Event Projection demos<\/h3>\n<p>Most of it is in Java:<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\">api<\/a><\/code> &#8211; provides the HTTP\/REST API for the application to query\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/exceptions\">exceptions<\/a><\/code> &#8211; error handling &#8211; such as if the client application requests something that isn&#8217;t in the projection<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/health\">health<\/a><\/code> &#8211; Kubernetes probe endpoints &#8211; as I wrote this to run in OpenShift<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p>There are also some common config files:<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/liberty\/config\">liberty\/config<\/a><\/code> &#8211; configuration\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/liberty\/config\/server.xml\">server.xml<\/a><\/code> &#8211; identifies which OpenLiberty features I&#8217;m using<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/liberty\/config\/bootstrap.properties\">bootstrap.properties<\/a><\/code> &#8211; connection details for the PostgreSQL database when running locally (overriden by environment variables when running in OpenShift)<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/pom.xml\">pom.xml<\/a><\/code> &#8211; dependencies<\/li>\n<\/ul>\n<h3>Code structure: maintaining an database<\/h3>\n<p>I didn&#8217;t write any code for this. I used a Kafka Connect JDBC Sink Connector and configured it to maintain a projection in a PostgreSQL database.<\/p>\n<p>To <strong>create a database<\/strong>, I <a href=\"https:\/\/github.com\/IBM\/event-automation-demo\/blob\/main\/INSTALL-OPTIONS.md#postgresql\">re-used the Ansible playbook from the Loosehanger Jeans demo scenario<\/a> which creates a simple secured <a href=\"https:\/\/github.com\/IBM\/event-automation-demo\/blob\/main\/install\/supporting-demo-resources\/pgsql\/templates\/03-db.yaml\">PostgreSQL database<\/a> in OpenShift, and stores credentials for accessing it in a Secret.<\/p>\n<p>To <strong>create a Kafka Connect runtime with support for JDBC Sink Connectors<\/strong>, I <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/tree\/master?tab=readme-ov-file#jdbc-sink\">re-used the instructions from my Kafka demos repository<\/a>, which builds a Kafka Connect container image with the jars necessary to support running <a href=\"https:\/\/github.com\/Aiven-Open\/jdbc-connector-for-apache-kafka\/blob\/master\/docs\/sink-connector.md\">JDBC Sink Connectors<\/a>.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/github-jdbcsink.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>By using the &#8220;upsert&#8221; mode in the Connector, I could achieve the goal of maintaining a projection that stores the most recent event for each sensor \/ door. For example, for each new sensor reading, the data from the new event replaces the row with the previous reading from that sensor.<\/p>\n<p>As the two separate demos needed slightly different behaviour for how to store the events in the database, I created a separate connector for each.<\/p>\n<p><strong>Storing sensor readings keyed by message key<\/strong><\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/k8s.yaml#L85-L97\"><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\">projection-sensorreadings-connector<\/code><\/a><\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\n# source\ntopics: SENSOR.READINGS\n# connection info\nconnection.url: jdbc:postgresql:\/\/${file:\/mnt\/connect-creds-postgresql:host}:${file:\/mnt\/connect-creds-postgresql:pgbouncer-port}\/${file:\/mnt\/connect-creds-postgresql:dbname}\nconnection.user: ${file:\/mnt\/connect-creds-postgresql:user}\nconnection.password: ${file:\/mnt\/connect-creds-postgresql:password}\ntable.name.format: sensorreadings\n# behaviour\ninsert.mode: upsert\nauto.create: true\n# primary key\npk.mode: record_key\npk.fields: key\n<\/pre>\n<p>This configuration created a connector that:<\/p>\n<ul>\n<li>subscribes to the <code  style=\"color: #770000; font-weight: bold;\">SENSOR.READINGS<\/code> topic<\/li>\n<li>stores events in the database table <code  style=\"color: #770000; font-weight: bold;\">sensorreadings<\/code>, using the Kafka message key as the database primary key<\/li>\n<\/ul>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/postgresql-sensors.png?raw=true\"  style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p><strong>Storing door events keyed by message payload attribute<\/strong><\/p>\n<p><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/k8s.yaml#L121-L133\"><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\">projection-doorbadgeins-connector<\/code><\/a><\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">\n# source\ntopics: DOOR.BADGEIN\n# connection info\nconnection.url: jdbc:postgresql:\/\/${file:\/mnt\/connect-creds-postgresql:host}:${file:\/mnt\/connect-creds-postgresql:pgbouncer-port}\/${file:\/mnt\/connect-creds-postgresql:dbname}\nconnection.user: ${file:\/mnt\/connect-creds-postgresql:user}\nconnection.password: ${file:\/mnt\/connect-creds-postgresql:password}\ntable.name.format: doorbadgeins\n# behaviour\ninsert.mode: upsert\nauto.create: true\n# primary key\npk.mode: record_value\npk.fields: door\n<\/pre>\n<p>This configuration created a connector that:<\/p>\n<ul>\n<li>subscribes to the <code  style=\"color: #770000; font-weight: bold;\">DOOR.BADGEIN<\/code> topic<\/li>\n<li>stores events in the database table <code  style=\"color: #770000; font-weight: bold;\">doorbadgeins<\/code> , using the &#8220;door&#8221; property from the Kafka message payload as the database primary key<\/li>\n<\/ul>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/screenshots\/postgresql-badgeins.png?raw=true\"  style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<h3>Code structure: accessing a database projection<\/h3>\n<p>By using <a href=\"https:\/\/openliberty.io\/docs\/latest\/data-persistence.html\">OpenLiberty&#8217;s JPA support<\/a>, there was very little code to write to be able to retrieve data from the database.<\/p>\n<p>This is all I needed:<\/p>\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/loosehanger\">data\/loosehanger<\/a><\/code> &#8211; defines the data that I retrieve from the database <br \/>(note that these classes need to be listed in <a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/resources\/META-INF\/persistence.xml\"><code style=\"color: #770000; font-weight: bold; font-size: 0.9em;\">resources\/META-INF\/persistence.xml<\/code><\/a>)\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/loosehanger\/SensorReading.java\">SensorReading.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @Entity\n@Table(name=\"sensorreadings\")\n@NamedQuery(\n    name = \"SensorReading.findSensorReading\",\n    query = \"SELECT e FROM SensorReading e WHERE e.sensorid = :sensorid\"\n)\npublic class SensorReading implements Serializable {\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Id\n&nbsp; &nbsp; @Column(name = \"key\")\n&nbsp; &nbsp; private String sensorid;\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Column(name = \"sensortime\")\n&nbsp; &nbsp; private String sensortime;\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Column(name = \"temperature\")\n&nbsp; &nbsp; private Double temperature;\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Column(name = \"humidity\")\n&nbsp; &nbsp; private Integer humidity;\n&nbsp;\n&nbsp; &nbsp; ...\n<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/data\/loosehanger\/DoorBadgeIn.java\">DoorBadgeIn.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @Entity\n@Table(name=\"doorbadgeins\")\n@NamedQuery(\n    name = \"DoorBadgeIn.findBadgeIn\",\n    query = \"SELECT e FROM DoorBadgeIn e WHERE e.door = :door\"\n)\npublic class DoorBadgeIn implements Serializable {\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Id\n&nbsp; &nbsp; @Column(name = \"door\")\n&nbsp; &nbsp; String door;\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Column(name = \"recordid\")\n&nbsp; &nbsp; String recordid;\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Column(name = \"employee\")\n&nbsp; &nbsp; String employee;\n&nbsp;\n&nbsp; &nbsp; @NotNull\n&nbsp; &nbsp; @Column(name = \"badgetime\")\n&nbsp; &nbsp; String badgetime;\n&nbsp;\n&nbsp; &nbsp; ...<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/database\">database<\/a><\/code> &#8211; defines the queries for accessing the database\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/database\/SensorReadingsDao.java\">SensorReadingsDao.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @RequestScoped\npublic class SensorReadingsDao {\n&nbsp;\n&nbsp; &nbsp; @PersistenceContext\n&nbsp; &nbsp; private EntityManager em;\n&nbsp;\n&nbsp; &nbsp; public SensorReading findDoorBadgeIn(String sensorid) {\n&nbsp; &nbsp; &nbsp; &nbsp; List&lt;SensorReading&gt; sensorReadings = em.createNamedQuery(\"SensorReading.findSensorReading\", SensorReading.class)\n            .setParameter(\"sensorid\", sensorid)\n            .getResultList();\n        return sensorReadings == null || sensorReadings.isEmpty() ? null : sensorReadings.get(0);\n&nbsp; &nbsp; }\n}<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/database\/DoorBadgeInDao.java\">DoorBadgeInDao.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @RequestScoped\npublic class DoorBadgeInDao {\n&nbsp;\n&nbsp; &nbsp; @PersistenceContext\n&nbsp; &nbsp; private EntityManager em;\n&nbsp;\n&nbsp; &nbsp; public DoorBadgeIn findDoorBadgeIn(String doorid) {\n&nbsp; &nbsp; &nbsp; &nbsp; List&lt;DoorBadgeIn&gt; badgeIns = em.createNamedQuery(\"DoorBadgeIn.findBadgeIn\", DoorBadgeIn.class)\n            .setParameter(\"door\", doorid)\n            .getResultList();\n        return badgeIns == null || badgeIns.isEmpty() ? null : badgeIns.get(0);\n&nbsp; &nbsp; }\n}<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.2em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/tree\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\">api<\/a><\/code> &#8211; REST API endpoints for retrieving data from the projection\n<ul>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/SensorReadingsApi.java\">SensorReadingsApi.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @Path(\"\/sensorreadings\")\n@Produces(MediaType.APPLICATION_JSON)\npublic class SensorReadingsApi {\n&nbsp;\n&nbsp; &nbsp; @Inject\n&nbsp; &nbsp; private SensorReadingsDao sensorReadingsDao;\n&nbsp;\n&nbsp; &nbsp; @GET()\n&nbsp; &nbsp; @Path(\"\/{sensorid}\")\n&nbsp; &nbsp; public SensorReading getSensorReading(@PathParam(\"sensorid\") String sensorId) {\n&nbsp; &nbsp; &nbsp; &nbsp; SensorReading latest = sensorReadingsDao.findDoorBadgeIn(sensorId);\n&nbsp; &nbsp; &nbsp; &nbsp; if (latest == null) {\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NotFoundException();\n&nbsp; &nbsp; &nbsp; &nbsp; }\n&nbsp; &nbsp; &nbsp; &nbsp; return latest;\n&nbsp; &nbsp; }\n}<\/pre>\n<\/li>\n<li><code style=\"color: #770000; font-weight: bold; font-size: 1.0em;\"><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/src\/main\/java\/uk\/co\/dalelane\/kafkaprojections\/api\/DoorBadgeInsApi.java\">DoorBadgeInsApi.java<\/a><\/code>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 420px;\">  @Path(\"\/badgeins\")\n@Produces(MediaType.APPLICATION_JSON)\npublic class DoorBadgeInsApi {\n&nbsp;\n&nbsp; &nbsp; @Inject\n&nbsp; &nbsp; private DoorBadgeInDao doorBadgeInDao;\n&nbsp;\n&nbsp; &nbsp; @GET()\n&nbsp; &nbsp; @Path(\"\/{doorid}\")\n&nbsp; &nbsp; public DoorBadgeIn getDoorBadgeIn(@PathParam(\"doorid\") String doorid) {\n&nbsp; &nbsp; &nbsp; &nbsp; DoorBadgeIn latest = doorBadgeInDao.findDoorBadgeIn(doorid);\n&nbsp; &nbsp; &nbsp; &nbsp; if (latest == null) {\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NotFoundException();\n&nbsp; &nbsp; &nbsp; &nbsp; }\n&nbsp; &nbsp; &nbsp; &nbsp; return latest;\n&nbsp; &nbsp; }\n}<\/pre>\n<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<h3>Running the demo<\/h3>\n<p><a href=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/README.md\"><code style=\"color: #770000; font-weight: bold;\">02-database\/README.md<\/code><\/a> contains instructions for how to build and run this demo for yourself. It includes instructions for running it locally on your own computer (pointing at a local database) and deploying to an OpenShift cluster.<\/p>\n<h3>When is a database projection a good choice?<\/h3>\n<p><img decoding=\"async\" src=\"https:\/\/github.com\/dalelane\/event-projections-demos\/blob\/master\/02-database\/images\/projection-database-detail.png?raw=true\" style=\"border: thin black solid; width: 100%; max-width: 750px;\"\/><\/p>\n<p>Using a database to store the projection is a good choice <strong>where the projection is likely to grow very large<\/strong> as it is well-suited to scaling storage.<\/p>\n<p>As the database provides persistent storage, this approach is not dependent on the Kafka topic for long-term storage &#8211; making it a reliable choice <strong>where Kafka topics have very short retention limits<\/strong>. Even if the Kafka topic retention isn&#8217;t necessarily short, this can help <strong>where critical data is produced infrequently<\/strong>. For example, imagine if sensors only produced temperature events when the temperature was abnormal. This could mean that some sensors emitted events only once a year \u2013 perhaps to a topic with a 6-month retention. In general, where it is likely that important historical data has long-since been aged-out of the Kafka topic, it can still be safely maintained in a database-backed projection.<\/p>\n<p>The demo shows that this is a common-enough pattern that a event projection can be maintained in a database without writing any new code, simply by configuring existing off-the-shelf Connectors. This illustrates that this is a good approach <strong>where developing a lot of new code is not a viable option<\/strong>. Even accessing the projection required minimal coding, as technologies like JPA can accelerate a lot of this.<\/p>\n<p>A structured relational database provides a lot of powerful querying options, making it a good choice <strong>where data needs to be retrieved from the projection in various ways<\/strong>. Although for this demo I only showed retrieving data from the projection based on the primary key, I obviously could extend this to include indexing, searching, filtering, and sorting.<br \/>\nIt would be trivial, for example, to extend the demo to add an API that queries the <code style=\"color: #770000;\">sensorreadings<\/code> table for all the latest sensor reading events with a temperature above 23 degrees<br \/>\n(<code style=\"color: #770000; font-weight: 500;\">SELECT * FROM sensorreadings WHERE temperature > 23;<\/code>).<br \/>\nOr to return the ten sensors with the highest humidity<br \/>\n(<code style=\"color: #770000; font-weight: 500;\">SELECT * FROM sensorreadings ORDER BY humidity DESC LIMIT 10;<\/code>).<br \/>\nThis flexibility also makes it an effective approach <strong>where applications need to support future changes in data access<\/strong> as new queries can be added incrementally over time.<\/p>\n<h3>When is a database projection NOT a good choice?<\/h3>\n<p>This approach introduced two new runtime components: Kafka Connect to host the connectors that maintain the projection, and the database that stored the projection. These didn&#8217;t require development of any new code, but that doesn&#8217;t remove the ops burden. These components need to be deployed, managed, monitored and maintained, which means this approach may not be a good choice <strong>where there are restrictions on introducing new architectural components<\/strong>.<\/p>\n<h3>In the next post&#8230;<\/h3>\n<p>In my next post, I&#8217;ll show how you can use Kafka Streams to do something similar to this, with Kafka topics providing the persistent storage instead of a database.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Sample implementation of maintaining an Event Projection of an Apache Kafka topic using a JDBC database.<\/p>\n","protected":false},"author":1,"featured_media":5423,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,584],"class_list":["post-5422","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5422","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5422"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5422\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5423"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5422"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5422"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5422"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}