In this post, I’ll walk through a sample implementation of using a database to maintain an Event Projection. I’ll use this to illustrate when this is a suitable approach to use.
The objective for this demo
In Comparing approaches to maintaining an Event Projection from Kafka topics, I introduced the pattern of Event Projections.
I also introduced the scenario that I’ll be using in these demos. Please see that post for the detail and motivation, but to recap, I want to maintain a projection of two Kafka topics (one based on the event key, the other based on an attribute in the event payload).
In both cases, I want to be able to make an HTTP/REST call to retrieve the data from the most recent event that matches my query.
At a high-level, the goal was to:
- use Kafka Connect JDBC sink connectors to maintain a database projection of the Kafka topics
- provide an HTTP/REST API for querying the projection
For demo purposes, my “application” will be curl
, so I can illustrate being able to query the projection like this.
After an event like this arrives on the SENSOR.READINGS Kafka topic:
I can query for the latest temperature and humidity for that location:
curl --silent \ http://projectionsdemo.apps.dale-lane.demo.ibm.com/sensorreadings/G-0-15 | jq { "humidity": 43, "sensorId": "G-0-15", "sensorTime": "Thu Nov 28 13:47:09 GMT 2024", "temperature": 22.3 }
And after an event like this arrives on the DOOR.BADGEIN Kafka topic:
I can query for the latest employee to have gone through that door.
curl --silent \ http://projectionsdemo.apps.dale-lane.demo.ibm.com/badgeins/DE-1-16 | jq { "badgeTime": "2024-11-28 13:54:48.702", "doorId": "DE-1-16", "employee": "geralyn.littel", "recordId": "0fc23607-4527-4b6a-bb19-9e6aac5c22ba" }
Demo implementation
You can find the source code for a demo implementation in Github at dalelane/event-projections-demos:
In this post, I’ll be going through the 02-database
folder, which contains the second of my demos. It includes a complete and deployable demo, pre-configured to run in OpenShift and work with the demo scenario environment I described in my first post.
As with the in-memory demo, I used the same OpenLiberty configuration as the basis for the demo server. This made it quick and simple to create, but also is a more realistic approach than a “Hello World” toy demo.
Code structure
Most of the source code for this demo is boilerplate that will be identical across the different event projection approaches. I’ll quickly remind you of what is in here.
Next I’ll show how the demo maintains the projection. And finally, I’ll show how the demo accesses the projection.
Code structure: common across Event Projection demos
Most of it is in Java:
api
– provides the HTTP/REST API for the application to queryexceptions
– error handling – such as if the client application requests something that isn’t in the projectionhealth
– Kubernetes probe endpoints – as I wrote this to run in OpenShift
There are also some common config files:
liberty/config
– configurationserver.xml
– identifies which OpenLiberty features I’m usingbootstrap.properties
– connection details for the PostgreSQL database when running locally (overriden by environment variables when running in OpenShift)
pom.xml
– dependencies
Code structure: maintaining an database
I didn’t write any code for this. I used a Kafka Connect JDBC Sink Connector and configured it to maintain a projection in a PostgreSQL database.
To create a database, I re-used the Ansible playbook from the Loosehanger Jeans demo scenario which creates a simple secured PostgreSQL database in OpenShift, and stores credentials for accessing it in a Secret.
To create a Kafka Connect runtime with support for JDBC Sink Connectors, I re-used the instructions from my Kafka demos repository, which builds a Kafka Connect container image with the jars necessary to support running JDBC Sink Connectors.
By using the “upsert” mode in the Connector, I could achieve the goal of maintaining a projection that stores the most recent event for each sensor / door. For example, for each new sensor reading, the data from the new event replaces the row with the previous reading from that sensor.
As the two separate demos needed slightly different behaviour for how to store the events in the database, I created a separate connector for each.
Storing sensor readings keyed by message key
projection-sensorreadings-connector
# source topics: SENSOR.READINGS # connection info connection.url: jdbc:postgresql://${file:/mnt/connect-creds-postgresql:host}:${file:/mnt/connect-creds-postgresql:pgbouncer-port}/${file:/mnt/connect-creds-postgresql:dbname} connection.user: ${file:/mnt/connect-creds-postgresql:user} connection.password: ${file:/mnt/connect-creds-postgresql:password} table.name.format: sensorreadings # behaviour insert.mode: upsert auto.create: true # primary key pk.mode: record_key pk.fields: key
This configuration created a connector that:
- subscribes to the
SENSOR.READINGS
topic - stores events in the database table
sensorreadings
, using the Kafka message key as the database primary key
Storing door events keyed by message payload attribute
projection-doorbadgeins-connector
# source topics: DOOR.BADGEIN # connection info connection.url: jdbc:postgresql://${file:/mnt/connect-creds-postgresql:host}:${file:/mnt/connect-creds-postgresql:pgbouncer-port}/${file:/mnt/connect-creds-postgresql:dbname} connection.user: ${file:/mnt/connect-creds-postgresql:user} connection.password: ${file:/mnt/connect-creds-postgresql:password} table.name.format: doorbadgeins # behaviour insert.mode: upsert auto.create: true # primary key pk.mode: record_value pk.fields: door
This configuration created a connector that:
- subscribes to the
DOOR.BADGEIN
topic - stores events in the database table
doorbadgeins
, using the “door” property from the Kafka message payload as the database primary key
Code structure: accessing a database projection
By using OpenLiberty’s JPA support, there was very little code to write to be able to retrieve data from the database.
This is all I needed:
data/loosehanger
– defines the data that I retrieve from the database
(note that these classes need to be listed inresources/META-INF/persistence.xml
)SensorReading.java
@Entity @Table(name="sensorreadings") @NamedQuery( name = "SensorReading.findSensorReading", query = "SELECT e FROM SensorReading e WHERE e.sensorid = :sensorid" ) public class SensorReading implements Serializable { @NotNull @Id @Column(name = "key") private String sensorid; @NotNull @Column(name = "sensortime") private String sensortime; @NotNull @Column(name = "temperature") private Double temperature; @NotNull @Column(name = "humidity") private Integer humidity; ...
DoorBadgeIn.java
@Entity @Table(name="doorbadgeins") @NamedQuery( name = "DoorBadgeIn.findBadgeIn", query = "SELECT e FROM DoorBadgeIn e WHERE e.door = :door" ) public class DoorBadgeIn implements Serializable { @NotNull @Id @Column(name = "door") String door; @NotNull @Column(name = "recordid") String recordid; @NotNull @Column(name = "employee") String employee; @NotNull @Column(name = "badgetime") String badgetime; ...
database
– defines the queries for accessing the databaseSensorReadingsDao.java
@RequestScoped public class SensorReadingsDao { @PersistenceContext private EntityManager em; public SensorReading findDoorBadgeIn(String sensorid) { List<SensorReading> sensorReadings = em.createNamedQuery("SensorReading.findSensorReading", SensorReading.class) .setParameter("sensorid", sensorid) .getResultList(); return sensorReadings == null || sensorReadings.isEmpty() ? null : sensorReadings.get(0); } }
DoorBadgeInDao.java
@RequestScoped public class DoorBadgeInDao { @PersistenceContext private EntityManager em; public DoorBadgeIn findDoorBadgeIn(String doorid) { List<DoorBadgeIn> badgeIns = em.createNamedQuery("DoorBadgeIn.findBadgeIn", DoorBadgeIn.class) .setParameter("door", doorid) .getResultList(); return badgeIns == null || badgeIns.isEmpty() ? null : badgeIns.get(0); } }
api
– REST API endpoints for retrieving data from the projectionSensorReadingsApi.java
@Path("/sensorreadings") @Produces(MediaType.APPLICATION_JSON) public class SensorReadingsApi { @Inject private SensorReadingsDao sensorReadingsDao; @GET() @Path("/{sensorid}") public SensorReading getSensorReading(@PathParam("sensorid") String sensorId) { SensorReading latest = sensorReadingsDao.findDoorBadgeIn(sensorId); if (latest == null) { throw new NotFoundException(); } return latest; } }
DoorBadgeInsApi.java
@Path("/badgeins") @Produces(MediaType.APPLICATION_JSON) public class DoorBadgeInsApi { @Inject private DoorBadgeInDao doorBadgeInDao; @GET() @Path("/{doorid}") public DoorBadgeIn getDoorBadgeIn(@PathParam("doorid") String doorid) { DoorBadgeIn latest = doorBadgeInDao.findDoorBadgeIn(doorid); if (latest == null) { throw new NotFoundException(); } return latest; } }
Running the demo
02-database/README.md
contains instructions for how to build and run this demo for yourself. It includes instructions for running it locally on your own computer (pointing at a local database) and deploying to an OpenShift cluster.
When is a database projection a good choice?
Using a database to store the projection is a good choice where the projection is likely to grow very large as it is well-suited to scaling storage.
As the database provides persistent storage, this approach is not dependent on the Kafka topic for long-term storage – making it a reliable choice where Kafka topics have very short retention limits. Even if the Kafka topic retention isn’t necessarily short, this can help where critical data is produced infrequently. For example, imagine if sensors only produced temperature events when the temperature was abnormal. This could mean that some sensors emitted events only once a year – perhaps to a topic with a 6-month retention. In general, where it is likely that important historical data has long-since been aged-out of the Kafka topic, it can still be safely maintained in a database-backed projection.
The demo shows that this is a common-enough pattern that a event projection can be maintained in a database without writing any new code, simply by configuring existing off-the-shelf Connectors. This illustrates that this is a good approach where developing a lot of new code is not a viable option. Even accessing the projection required minimal coding, as technologies like JPA can accelerate a lot of this.
A structured relational database provides a lot of powerful querying options, making it a good choice where data needs to be retrieved from the projection in various ways. Although for this demo I only showed retrieving data from the projection based on the primary key, I obviously could extend this to include indexing, searching, filtering, and sorting.
It would be trivial, for example, to extend the demo to add an API that queries the sensorreadings
table for all the latest sensor reading events with a temperature above 23 degrees
(SELECT * FROM sensorreadings WHERE temperature > 23;
).
Or to return the ten sensors with the highest humidity
(SELECT * FROM sensorreadings ORDER BY humidity DESC LIMIT 10;
).
This flexibility also makes it an effective approach where applications need to support future changes in data access as new queries can be added incrementally over time.
When is a database projection NOT a good choice?
This approach introduced two new runtime components: Kafka Connect to host the connectors that maintain the projection, and the database that stored the projection. These didn’t require development of any new code, but that doesn’t remove the ops burden. These components need to be deployed, managed, monitored and maintained, which means this approach may not be a good choice where there are restrictions on introducing new architectural components.
In the next post…
In my next post, I’ll show how you can use Kafka Streams to do something similar to this, with Kafka topics providing the persistent storage instead of a database.
Tags: apachekafka, kafka