{"id":5920,"date":"2026-03-29T01:00:19","date_gmt":"2026-03-29T01:00:19","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5920"},"modified":"2026-03-29T01:00:20","modified_gmt":"2026-03-29T01:00:20","slug":"extending-flink-sql","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5920","title":{"rendered":"Extending Flink SQL"},"content":{"rendered":"<p><strong>In this post, I\u2019ll share examples of how writing user-defined functions (UDFs) extends what is possible using built-in Flink SQL functions alone.<\/strong><\/p>\n<p>I&#8217;ll share examples of how UDFs can:<\/p>\n<ul>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#readability\">make complex SQL readable and maintainable<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#complexity\">add algorithmic logic that SQL cannot express<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#exploding\">reshape complex events into simpler streams<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#summarising\">summarise events in domain-specific ways<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#stateful\">enrich events using custom state<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#timers\">decide when to output results<\/a><\/li>\n<\/ul>\n<p><!--more--><\/p>\n<h3 style=\"background-color: #f3c851; padding-left: 0.8em; padding-top: 0.8em; margin-top: 3em; padding-right: 0.8em;\"><a name=\"readability\"><span style=\"font-size: 0.8em;\">Extending Flink SQL to&#8230;<\/span><\/a><br \/> make complex SQL readable and maintainable<\/h3>\n<p>Simplicity and readability is a strength of using SQL for event stream processing. That can feel true while you\u2019re writing your SQL, but maybe a little less so when you\u2019re trying to read someone else&#8217;s more complex SQL.<\/p>\n<p>Remember the stream of e-bike location update events <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5612\">I was working with last year<\/a>? They included the current location of each e-bike. If I add <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">LAG<\/code>, I can get the current location together with the previous location of each bike.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/bike-locations.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>I could use these two locations like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n  bikeid, `last location`, location,\n  6371 * 2 *\n    ASIN(\n      SQRT(\n        POWER(\n          SIN(RADIANS(location.latitude - `last location`.latitude) \/ 2),\n          2\n        )\n          +\n        COS(RADIANS(`last location`.latitude))\n          *\n        COS(RADIANS(location.latitude))\n          *\n        POWER(\n          SIN(RADIANS(location.longitude - `last location`.longitude) \/ 2),\n          2\n        )\n      )\n    ) AS dist\nFROM\n  bike_locations;\n<\/pre>\n<p>My SQL above is estimating the geospatial distance between the two locations. To my (<em>math-phobic?<\/em>) eyes, the intent of that SQL statement is buried in a series of mathematical functions.<\/p>\n<p>Maybe you\u2019re a trigonometry expert and easily recognise the Haversine formula. Even so, I suspect you\u2019d agree that this SQL is fragile. I could easily imagine bugs being introduced (and difficult to spot!) when you start copy\/pasting this a few times across multiple queries for use with different locations.<\/p>\n<p>Compare that with this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n  bikeid, `last location`, location,\n  GEOSPATIAL_DISTANCE(`last location`.latitude, `last location`.longitude,\n                      location.latitude, location.longitude) AS dist\nFROM\n  bike_locations;\n<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/flink-sql\/bike-locations.sql\">full SQL<\/a><\/small><\/p>\n<p>This is doing the same thing, but is immediately clear and readable. The intent behind this query is explicit, rather than needing to be inferred. And I can reuse the logic in multiple streaming queries without risking hard-to-spot copy\/paste bugs.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/bike-distances.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>How did I do this?<\/p>\n<p>I moved the Haversine formula into a reusable Java class:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fflink-udfs-blogpost%2Fblob%2Fmain%2Fsrc%2Fmain%2Fjava%2Fcom%2Fibm%2Feventautomation%2Feventprocessing%2Fudfdemos%2Fscalar%2FGeospatialDistance.java%23L31-L64&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;fetchFromJsDelivr=on&#038;maxHeight=450\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\/scalar\/GeospatialDistance.java#L31-L64\"><code>GeospatialDistance.java<\/code> on github<\/a><\/small><\/p>\n<p>Then imported that into my SQL as a function:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nCREATE FUNCTION GEOSPATIAL_DISTANCE\n  AS 'com.ibm.eventautomation.eventprocessing.udfdemos.scalar.GeospatialDistance'\n  USING JAR '\/opt\/ibm\/sp-backend\/udfs\/udfs.jar';\n<\/pre>\n<p>When you&#8217;ve got bits of long, dense, complex SQL that is likely to be difficult to read or maintain (especially if you need to use similar variations of that in multiple places), those are good candidates for extracting into functions.<\/p>\n<h3 style=\"background-color: #f3c851; padding-left: 0.8em; padding-top: 0.8em; margin-top: 3em; padding-right: 0.8em;\"><a name=\"complexity\"><span style=\"font-size: 0.8em;\">Extending Flink SQL to&#8230;<\/span><\/a><br \/> add algorithmic logic that SQL cannot express<\/h3>\n<p>Some projects require processing that isn\u2019t feasible to express in SQL. If you need to calculate something that needs an algorithm to describe it (rather than a query expression) built-in SQL alone becomes a constraint.<\/p>\n<p>To give a simple example, you probably know that the last digit of a credit card number is a checksum.<\/p>\n<p><a href=\"https:\/\/stripe.com\/gb\/resources\/more\/how-to-use-the-luhn-algorithm-a-guide-in-applications-for-businesses\"><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/luhn.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/a><br \/>\n<small>Image source: <a href=\"https:\/\/stripe.com\/gb\/resources\/more\/how-to-use-the-luhn-algorithm-a-guide-in-applications-for-businesses\">stripe.com<\/a><\/small><\/p>\n<p>Computing a checksum is a valid thing to want to do as part of data validation in a stream processing pipeline, but can\u2019t reasonably be expressed using built-in SQL. SQL isn\u2019t a procedural language. It lacks even simple loops.<\/p>\n<p>This checksum algorithm is well suited to putting in a reusable Java function:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fflink-udfs-blogpost%2Fblob%2Fmain%2Fsrc%2Fmain%2Fjava%2Fcom%2Fibm%2Feventautomation%2Feventprocessing%2Fudfdemos%2Fscalar%2FCreditCardChecksum.java%23L27-L55&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;fetchFromJsDelivr=on&#038;maxHeight=450\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\/scalar\/CreditCardChecksum.java#L27-L55\"><code>CreditCardChecksum.java<\/code> on github<\/a><\/small><\/p>\n<p>This can be imported like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nCREATE FUNCTION CREDIT_CARD_CHECKSUM\n  AS 'com.ibm.eventautomation.eventprocessing.udfdemos.scalar.CreditCardChecksum'\n  USING JAR '\/opt\/ibm\/sp-backend\/udfs\/udfs.jar';\n<\/pre>\n<p>That lets you do something like:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    accountnum,\n    CREDIT_CARD_CHECKSUM(accountnum) AS is_valid,\n    event_time\nFROM\n    transactions;\n<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/flink-sql\/creditcard-checks.sql\">full SQL<\/a><\/small><\/p>\n<p>Like my previous example, clear and readable SQL that describes the business intent of the processing is a benefit.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/checksums.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>More importantly, in this case, writing a user defined function is letting me do data validation using a type of complex processing that I wouldn\u2019t have been able to do otherwise.<\/p>\n<h3 style=\"background-color: #f3c851; padding-left: 0.8em; padding-top: 0.8em; margin-top: 3em; padding-right: 0.8em;\"><a name=\"exploding\"><span style=\"font-size: 0.8em;\">Extending Flink SQL to&#8230;<\/span><\/a><br \/> reshape complex events into simpler streams<\/h3>\n<p>Sometimes our Kafka events are nested, complex, structured data objects that contain multiple items that each need to be responded to.<\/p>\n<p>A simple example that is possible with built-in SQL is to explode an array into multiple separate events. Imagine an order event that contains an array of products:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\n{\n     ...\n     \"products\": [\n         \"M Organic-Cotton Relaxed Jeans\",\n         \"L Stretch-Denim Slim Jeans\"\n         \"L Chambray Carpenter Jeans\"\n     ],\n     ...\n}\n<\/pre>\n<p>Even though this is just one event to respond to, a logistics pipeline might want to trigger a stock replenishment process for each these products. You can do this using built-in SQL by unpacking the array like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    ...\n    unpacked.product\n    ...\nFROM\n    orders\nCROSS JOIN\n    UNNEST(orders.products) AS unpacked(product);\n<\/pre>\n<p>But what if your event needs more complex logic to be exploded than just unpacking an array of items?<\/p>\n<p>For example, imagine a hotel booking event:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\n{\n   ...\n   \"start\": \"2026-01-15 15:00:00\",\n   \"end\":   \"2026-01-19 11:00:00\",\n   ...\n}\n<\/pre>\n<p>Maybe you want to emit a separate event for each day, in order to trigger some process for each day that falls within the span described there.<\/p>\n<ul>\n<li><strong>2026-01-15 <\/strong> 3pm &#8211; end of day<\/li>\n<li><strong>2026-01-16 <\/strong> all day<\/li>\n<li><strong>2026-01-17 <\/strong> all day<\/li>\n<li><strong>2026-01-18 <\/strong> all day<\/li>\n<li><strong>2026-01-19 <\/strong> until 11am<\/li>\n<\/ul>\n<p>Sounds simple, but there\u2019s no practical way to describe the logic needed to do that with built-in SQL alone.<\/p>\n<p>It\u2019s easy to implement in a function:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fflink-udfs-blogpost%2Fblob%2Fmain%2Fsrc%2Fmain%2Fjava%2Fcom%2Fibm%2Feventautomation%2Feventprocessing%2Fudfdemos%2Ftable%2FDailyIntervals.java%23L63-L95&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;fetchFromJsDelivr=on&#038;maxHeight=450\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\/table\/DailyIntervals.java#L63-L95\"><code>DailyIntervals.java<\/code> on github<\/a><\/small><\/p>\n<p>This can be imported similar to before like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nCREATE FUNCTION DAILY_INTERVALS\n  AS 'com.ibm.eventautomation.eventprocessing.udfdemos.table.DailyIntervals'\n  USING JAR '\/opt\/ibm\/sp-backend\/udfs\/udfs.jar';\n<\/pre>\n<p>I can now unpack a timespan into daily interval events as simply as I can unpack an array in built-in SQL:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    bookingstart, bookingend,\n    start_time, end_time\nFROM\n    booking_requests\nCROSS JOIN\n    LATERAL TABLE (\n        DAILY_INTERVALS (bookingstart, bookingend)\n    );\n<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/flink-sql\/booking-intervals.sql\">full SQL<\/a><\/small><\/p>\n<p>These separate day-span events can now be used in additional processing or to trigger downstream workflows.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/booking-intervals.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>User defined functions can make it easy to normalize complex payloads. Writing a function that contains the logic to extract the business meaning behind the payload lets you turn a single input event into many simple, uniform downstream records.<\/p>\n<h3 style=\"background-color: #f3c851; padding-left: 0.8em; padding-top: 0.8em; margin-top: 3em; padding-right: 0.8em;\"><a name=\"summarising\"><span style=\"font-size: 0.8em;\">Extending Flink SQL to&#8230;<\/span><\/a><br \/> summarize events in domain-specific ways<\/h3>\n<p>Flink SQL has a strong set of aggregate functions. <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">COUNT<\/code> counts how many times something is found in your events, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">AVG<\/code> returns the arithmetic mean for values in your events, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">SUM<\/code> adds up values in your events, <code style=\"background-color: #FFFFC0; color: #770000; padding: 4px; font-weight: 600;\">MAX<\/code> returns the largest value in your events, and so on. There are <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-master\/docs\/sql\/functions\/built-in-functions\/#aggregate-functions\">lots of functions available<\/a>, supporting the aggregates you\u2019ll need for most projects.<\/p>\n<p>But what if you need something a bit different?<\/p>\n<p>For example, imagine you\u2019ve got a sensor that emits temperature and humidity readings:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/sensors-normal.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>If you want to compute hourly averages, AVG will be fine for a lot of use cases.<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    window_start, window_end,\n    AVG(temperature) AS `average temp`,\n    AVG(humidity)    AS `average humidity`\nFROM\n    TABLE (\n        TUMBLE (\n            TABLE normal_sensor,\n            DESCRIPTOR(event_time),\n            INTERVAL '1' HOUR\n        )\n    )\nGROUP BY\n    window_start, window_end, window_time;\n<\/pre>\n<p>Imagine this is an unreliable IoT sensor that periodically glitches and emits a false reading.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/sensors-glitch.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>Using mean to get an average temperature and humidity will be sensitive to those outlier readings. What would be a better way to get an idea of the average sensor readings that is less impacted?<\/p>\n<p>There are ways to compute an average when mean isn\u2019t suitable. You\u2019ve probably heard of things like median and mode, but for this example I\u2019m going to use the L1 Medoid. This is where you select an item from a collection that is the most representative of all of the items. (That is, the one with the lowest total sum of absolute distances to other items).<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fflink-udfs-blogpost%2Fblob%2Fmain%2Fsrc%2Fmain%2Fjava%2Fcom%2Fibm%2Feventautomation%2Feventprocessing%2Fudfdemos%2Faggregate%2FL1Medoid.java%23L39-L112&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;fetchFromJsDelivr=on&#038;maxHeight=450\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\/aggregate\/L1Medoid.java#L39-L112\"><code>L1Medoid.java<\/code> on github<\/a><\/small><\/p>\n<p>I can import it like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nCREATE FUNCTION L1_MEDOID\n  AS 'com.ibm.eventautomation.eventprocessing.udfdemos.aggregate.L1Medoid'\n  USING JAR '\/opt\/ibm\/sp-backend\/udfs\/udfs.jar';\n<\/pre>\n<p>And use it like this, (leaving the AVG mean in there for comparison purposes):<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    window_start, window_end,\n    AVG(temperature)       AS `average temp`,\n    L1_MEDOID(temperature) AS `l1medoid temp`,\n    AVG(humidity)          AS `average humidity`,\n    L1_MEDOID(humidity)    AS `l1medoid humidity`\nFROM\n    TABLE (\n        TUMBLE (\n            TABLE glitchy_sensor,\n            DESCRIPTOR(event_time),\n            INTERVAL '1' HOUR\n        )\n    )\nGROUP BY\n    window_start, window_end, window_time;\n<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/flink-sql\/sensors-average.sql\">full SQL<\/a><\/small><\/p>\n<p>This alternative approach to averages could be more useful in some situations.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/sensors-average.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>Alternative averages to the mean that you get with the built-in AVG function is one way to get started, but you\u2019re not limited to that. User defined functions are a helpful option any time when you have specific requirements for metrics, KPIs, and roll-ups. More generally, they are useful any time that you want custom reasoning over a time window or group of events.<\/p>\n<p>You can write custom functions to reason at the level of trends, rather than just individual events.<\/p>\n<h3 style=\"background-color: #f3c851; padding-left: 0.8em; padding-top: 0.8em; margin-top: 3em; padding-right: 0.8em;\"><a name=\"stateful\"><span style=\"font-size: 0.8em;\">Extending Flink SQL to&#8230;<\/span><\/a><br \/> enrich events using custom state<\/h3>\n<p>You can take this further with process table functions, which let you implement stateful stream processing such as remembering previous events, and correlating events across time.<\/p>\n<p>For example, I shared some examples of <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5806\">using Flink SQL to process click stream events<\/a> earlier this year.<\/p>\n<p>Many of the examples I wrote using only built-in SQL used the \u201csessionid\u201d property in each click event. The sessionid let me group together click events caused by the same user as part of the same session as they clicked around a retail site. My examples using the built-in session window functionality all had to wait until a session was over (i.e. until no click events were observed with the session id for a pre-determined amount of time) before they could emit an output for that session.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-01-12-clicktracking\/4-aggregate-session\/3-running.png\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><\/p>\n<p>If you want to emit output events about a session while the session is still active, you can write your own custom function for this.<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fflink-udfs-blogpost%2Fblob%2Fmain%2Fsrc%2Fmain%2Fjava%2Fcom%2Fibm%2Feventautomation%2Feventprocessing%2Fudfdemos%2Fptf%2FSessionEnricher.java%23L93-L166&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;fetchFromJsDelivr=on&#038;maxHeight=450\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\/ptf\/SessionEnricher.java#L93-L166\"><code>SessionEnricher.java<\/code> on github<\/a><\/small><\/p>\n<p>And embed it in your SQL like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nCREATE FUNCTION SESSION_ENRICHER\n  AS 'com.ibm.eventautomation.eventprocessing.udfdemos.ptf.SessionEnricher'\n  USING JAR '\/opt\/ibm\/sp-backend\/udfs\/udfs.jar';\n<\/pre>\n<p>which you could use with<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    *\nFROM\n    SESSION_ENRICHER (\n        input => TABLE clicks PARTITION BY sessionid,\n        on_time => DESCRIPTOR(event_time),\n        event_type_field => 'type'\n    );\n<\/pre>\n<p><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/flink-sql\/session-enricher.sql\">full SQL<\/a><\/small><\/p>\n<p>Every click event is being enriched with additional properties saying how long the session has been going so far, and how many of each type of click event has been observed from this user since the session started.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/sessions-enriched.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><br \/>\n<small>input click event on the left, enriched event with session info on the right<\/small><\/p>\n<p>The key here is that writing my own custom function gave me the flexibility to define my own state that maintains the current values I\u2019m interested in for each session. (And it let me emit this immediately without having to wait for a session window to close.)<\/p>\n<p>Process table functions are a hugely powerful and flexible option when extending Flink SQL, as they give you the ability to maintain any custom state you need and (I used just a couple of counts to keep this simple, but you can get more complex) and use this to enrich the events that you process.<\/p>\n<h3 style=\"background-color: #f3c851; padding-left: 0.8em; padding-top: 0.8em; margin-top: 3em; padding-right: 0.8em;\"><a name=\"timers\"><span style=\"font-size: 0.8em;\">Extending Flink SQL to&#8230;<\/span><\/a><br \/> decide when to output results<\/h3>\n<p>Some stream processing problems shouldn&#8217;t be triggered immediately by an incoming event. Perhaps they should be triggered by nothing happening, such as a sensor going quiet or a user that stops clicking. Or perhaps they should be triggered after a delay, such as no update arriving within a deadline.<\/p>\n<p>Built-in Flink SQL only lets you output immediately when an event is received, or when a pre-defined window closes (e.g. an hourly aggregate outputs results at the end of an hour).<\/p>\n<p>Writing your own function lets you define your own notion of timing. You can delay output until a specific point in time, or trigger output only after a custom condition is met.<\/p>\n<p>Your function is able to define when it should output something, not just what it should output.<\/p>\n<p>Debouncing is an example of this kind of logic. For example, I might want to filter out rapid bursts of events, keeping only the latest event in a burst for further processing. In this way, the decision about when to emit an output event is not based on the content of any single input event, but on whether more events arrive shortly afterwards. The output is produced only once the stream has been quiet for long enough.<\/p>\n<p>For this example, when an event is received, I started a ten second timer. If another event with the same ID arrives before the timer expires, the timer is reset. Only once the timer is able to expire is the event emitted.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/debounce-all.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><br \/>\n<small>all sensor readings on the left, debounced filtered view on the right<\/small><\/p>\n<p>I implemented the function to do that like this:<\/p>\n<p><script src=\"https:\/\/emgithub.com\/embed-v2.js?target=https%3A%2F%2Fgithub.com%2Fdalelane%2Fflink-udfs-blogpost%2Fblob%2Fmain%2Fsrc%2Fmain%2Fjava%2Fcom%2Fibm%2Feventautomation%2Feventprocessing%2Fudfdemos%2Fptf%2FDebounce.java%23L48-L106&#038;style=default&#038;type=code&#038;showBorder=on&#038;showLineNumbers=on&#038;fetchFromJsDelivr=on&#038;maxHeight=450\"><\/script><small><a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\/ptf\/Debounce.java#L48-L106\"><code>Debounce.java<\/code> on github<\/a><\/small><\/p>\n<p>I import it like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nCREATE FUNCTION DEBOUNCE\n  AS 'com.ibm.eventautomation.eventprocessing.udfdemos.ptf.Debounce'\n  USING JAR '\/opt\/ibm\/sp-backend\/udfs\/udfs.jar';\n<\/pre>\n<p>And could use it like this:<\/p>\n<pre style=\"font-size: 1.1em; white-space: pre !important; overflow-x: scroll; background-color: #FFFFC0; color: #770000; padding: 1em; border: thin #222222 solid;\">\nSELECT\n    *\nFROM\n    DEBOUNCE (\n        input => TABLE sensorreadings\n        PARTITION BY sensorid,\n        on_time => DESCRIPTOR(event_time)\n    );\n<\/pre>\n<p>Now a noisy stream of events can be filtered down to remove the bursts.<\/p>\n<p>Looking at the impact of the function on a single sensor ID:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2026-03-29-flinkudfs\/debounce-one.jpg\" style=\"width: 100%; max-width: 750px; border: thin black solid;\"\/><br \/>\n<small>filtering &#8220;sensor-429&#8221;, all readings on the left, debounced view on the right<\/small><\/p>\n<p>My examples earlier in the post react directly to incoming events. Outputs were either immediate or tied to the closing of a window.<\/p>\n<p>You can extend Flink&#8217;s time-based logic beyond the windows that you have in built-in SQL (which specify how events are grouped) to custom timers that let you write logic to decide when results are emitted (as well as whether results are emitted at all).<\/p>\n<p>They make it possible to express concepts like timeouts, inactivity, debouncing, and deadlines directly in SQL. This lets you control when to emit, not just what to emit.<\/p>\n<h2>Summary<\/h2>\n<p>I hope this post has helped show that the built-in functions in Flink SQL are a starting point, not a limitation.<\/p>\n<p>I came up with six simple examples to show that Flink UDFs:<\/p>\n<ul>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#readability\">make complex SQL readable and maintainable<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#complexity\">add algorithmic logic that SQL cannot express<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#exploding\">reshape complex events into simpler streams<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#summarising\">summarise events in domain-specific ways<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#stateful\">enrich events using custom state<\/a><\/li>\n<li><a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5920#timers\">decide when to output results<\/a><\/li>\n<\/ul>\n<p>But there is much more that is possible to do than I could cover in such a short post.<\/p>\n<p>If you\u2019d like to see some more ideas, the github repository <a href=\"https:\/\/github.com\/MartijnVisser\/flink-ptf-examples\">MartijnVisser\/flink-ptf-examples<\/a> has nice examples of process table functions that are worth a look, and cover capabilities of the APIs that I haven\u2019t described here.<\/p>\n<hr \/>\n<h3>Trying it out<\/h3>\n<p>The source code for the user defined functions I wrote for this post can be found <a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/tree\/main\/src\/main\/java\/com\/ibm\/eventautomation\/eventprocessing\/udfdemos\">in Github<\/a>, along with the <a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/tree\/main\/demos\/flink-sql\">full SQL<\/a> that I&#8217;ve linked to above.<\/p>\n<p>That repository also has <a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/README.md\">notes on how I set up the Kafka topics<\/a> that I used to try out the functions.<\/p>\n<p>I used <a href=\"https:\/\/www.ibm.com\/products\/event-automation\/event-processing\">Event Processing<\/a> to give me a more visual illustration of the functions in action. If you want to reuse any of that, you can use <a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/blob\/main\/demos\/add-udf-to-eventprocessing.sh\">this script<\/a> to add the UDFs to your instance of Event Processing, and then import <a href=\"https:\/\/github.com\/dalelane\/flink-udfs-blogpost\/tree\/main\/demos\/ep-flows\">the flows that I created<\/a>.<\/p>\n<p>That isn&#8217;t required &#8211; any of this can be done with standard open-source Apache Flink.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this post, I\u2019ll share examples of how writing user-defined functions (UDFs) extends what is possible using built-in Flink SQL functions alone. I&#8217;ll share examples of how UDFs can: make complex SQL readable and maintainable add algorithmic logic that SQL cannot express reshape complex events into simpler streams summarise events in domain-specific ways enrich events [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[615,610],"class_list":["post-5920","post","type-post","status-publish","format-standard","hentry","category-code","tag-apacheflink","tag-flink"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5920","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5920"}],"version-history":[{"count":12,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5920\/revisions"}],"predecessor-version":[{"id":5932,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5920\/revisions\/5932"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5920"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5920"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5920"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}