Posts Tagged ‘flink’

Flink SQL aggregate functions

Monday, November 3rd, 2025

In this post, I want to share a couple of very quick and simple examples for how to use LISTAGG and ARRAY_AGG in Flink SQL.

This started as an answer I gave to a colleague asking about how to output collections of events from Flink SQL. I’ve removed the details and used this post to share a more general version of my answer, so I can point others to it in future.

Windowed aggregations

One of the great things in Flink is that it makes it easy to do time-based aggregations on a stream of events.

Using this is one of the first things that I see people try when they start playing with Flink. The third tutorial we give to new Event Processing users is to take a stream of order events and count the number of orders per hour.

In Flink SQL, that looks like:

SELECT
    COUNT (*) AS `number of orders`,
    window_start,
    window_end,
    window_time
FROM
    TABLE (
        TUMBLE (
            TABLE orders,
            DESCRIPTOR (ordertime),
            INTERVAL '1' HOUR
        )
    )
GROUP BY
    window_start,
    window_end,
    window_time

In our low-code UI, it looks like this:

However you do it, the result is a flow that emits a result at the end of every hour, with a count of how many order events were observed during the last hour.

But what if you don’t just want a count of the orders?

What if you want the collection of the actual order events emitted at the end of every hour?

To dream up a scenario using this stream of order events:

At the end of each hour, emit a list of all products ordered during the last hour, so the warehouse pickers can prepare those items for delivery.

This is where some of the other aggregate functions in Flink SQL can help.

LISTAGG

If you just want a single property (e.g. the name / description of the product that was ordered) from all of the events that you collect within each hourly window, then LISTAGG can help.

For example:

SELECT
    LISTAGG (description) AS `products to pick`,
    window_start,
    window_end,
    window_time
FROM
    TABLE (
        TUMBLE (
            TABLE orders,
            DESCRIPTOR (ordertime),
            INTERVAL '1' HOUR
        )
    )
GROUP BY
    window_start,
    window_end,
    window_time

That gives you a concatenated string, with a comma-separated list of all of the product descriptions from each of the events within each hour.

You can use a different separator, but it’s a comma by default.

ARRAY_AGG

If you want to output an object, with some or even all of the properties (e.g. the name and quantity of the products that was ordered) from all of the events that you collect within each hourly window, then ARRAY_AGG can help.

For example:

SELECT
    ARRAY_AGG (
        CAST (
            ROW (description, quantity)
                AS
            ROW <description STRING, quantity INT>
        )
    ) AS `products to pick`,
    window_start,
    window_end,
    window_time
FROM
    TABLE (
        TUMBLE (
            TABLE orders,
            DESCRIPTOR (ordertime),
            INTERVAL '1' HOUR
        )
    )
GROUP BY
    window_start,
    window_end,
    window_time

The CAST isn’t necessary, but it lets you give names to the properties instead of the default names you get such as EXPR$0, so downstream processing is easier.

In each hour, an event is emitted that contains an array of objects, made up of properties from the events in that hour.

And naturally you could add additional GROUP BY, for example, if you wanted a separate pick list event for each region:

SELECT
    region,
    ARRAY_AGG (
        CAST (
            ROW (description, quantity)
                AS
            ROW <description STRING, quantity INT>
        )
    ) AS `products to pick`,
    window_start,
    window_end,
    window_time
FROM
    TABLE (
        TUMBLE (
            TABLE orders,
            DESCRIPTOR (ordertime),
            INTERVAL '1' HOUR
        )
    )
GROUP BY
    window_start,
    window_end,
    window_time,
    region

This outputs five events at the end of every hour, one with a list of each of the orders for the NA, SA, EMEA, APAC, ANZ regions.

Try it yourself

I’ve used the “Loosehanger” orders data generator in these examples, so if you want to try it for yourself, you can find it at
github.com/IBM/kafka-connect-loosehangerjeans-source.

If you want to try it in Event Processing, the instructions for setting up the tutorial environment can be found at
ibm.github.io/event-automation/tutorials.

Using time series models with IBM Event Automation

Tuesday, July 22nd, 2025

Intro

graphic of an e-bike hire park

Imagine you run a city e-bike hire scheme.

Let’s say that you’ve instrumented your bikes so you can track their location and battery level.

When a bike is on the move, it emits periodic updates to a Kafka topic, and you use these events for a range of maintenance, logistics, and operations reasons.

You also have other Kafka topics, such as a stream of events with weather sensor readings covering the area of your bike scheme.

Do you know how to use predictive models to forecast the likely demand for bikes in the next few hours?

Could you compare these forecasts with the actual usage that follows, and use this to identify unusual demand?

Time series models

A time series is how a machine learning or data scientist would describe a dataset that consists of data values, ordered sequentially over time, and labelled with timestamps.

A time series model is a specific type of machine learning model that can analyze this type of sequential time series data. These models are used to predict future values and to identify anomalies.

For those of us used to working with Kafka topics, the machine learning definition of a “time series” sounds exactly like our definition of a Kafka topic. Kafka topics are a sequential ordered set of data values, each labelled with timestamps.

(more…)

Understanding event processing behaviour with OpenTelemetry

Friday, January 31st, 2025

When using Apache Kafka, timely processing of events is an important consideration.

Understanding the throughput of your event processing solution is typically straightforward : by counting how many events you can process a second.

Understanding latency (how long it takes from when an event is first emitted, to when it has finished being processed and an action has been taken in response) requires more coordination to be able to measure.

OpenTelemetry helps with this, by collecting and correlating information from the different components that are producing and consuming events.

From opentelemetry.io:

OpenTelemetry is a collection of APIs, SDKs, and tools. Use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) to help you analyze your software’s performance and behavior.

A distributed event processing solution

To understand what is possible, I’ll use a simple (if contrived!) example of an event processing architecture.

(more…)

Turning noise into actionable alerts using Flink

Tuesday, January 28th, 2025

In this post, I want to share two examples of how you can use pattern recognition in Apache Flink to turn a noisy stream of output into something useful and actionable.

Imagine that you have a Kafka topic with a stream of events that you sometimes need to respond to.

You can think of this conceptually as being a stream of values that could be plotted against time.

To make this less abstract, we can use the events from the sensor readings topic that the “Loosehanger” data generator produces to. Those events are a stream of temperature and humidity readings.

Imagine that these events represent something that you might need to respond to when the event for a sensor exceeds some given threshold.

You can think of it visually like this:

(more…)

API Enrichment in Event Processing

Monday, December 2nd, 2024

Creating an Apache Flink job using IBM Event Processing that identifies customer retention opportunities

This is a demo of IBM Event Processing that I gave today. The focus was meant to be how API Enrichment (enriching a stream of events with reference data from external sources) can inform the processing of your Flink job, but it ended up being a useful example of filtering and transforming, too.


narrated demo at youtu.be/mzlQZBVg6HA

(more…)

Social media updates with Kafka Connect

Tuesday, November 19th, 2024

In this post, I’ll show how to bring posts from open social media networks (Bluesky and Mastodon) into Kafka using Kafka Connect source connectors.

My goal is to be able to populate a Kafka topic with status updates posted to social media.

Rather than to try and do this with the full firehose of all status updates, this is done with status updates that match a search term or hashtag.

For example, the screenshot above is a Kafka topic with posts from Bluesky that mention the term “xbox”.

(more…)

Using IBM Event Processing with rules engines

Tuesday, November 12th, 2024

In this post, I’ll demonstrate how Event Processing can use parameters from an external source (such as a rules engine) in event processing flows.

A simple flow to demonstrate the idea

To illustrate the idea, I created a simple demo event processing flow. The flow takes a stream of order events, filters it to keep only orders for high value items, and then modifies the description property in some of the events:

The filter node is comparing the price with “40”, so only order events for items with a value above $40 are kept.

The transform node is modifying the description property of order events – any description that contains the string “Cargo Jeans” is replaced with “Combat Trousers”.

Hard-coded parameters

What if you wanted to modify the threshold for the filter, to change that $40 minimum value for an order to be considered “large”?

Or what if you wanted to modify the transformation, so that different strings would be used in the regular expression replacement?

With the values hard-coded in the flow as shown above, you would need to:

  • create a savepoint for the job
  • stop the job
  • modify the parameters in the job
  • resume the job from the savepoint

This is a workable approach, although it does require a little downtime and some administrative effort.

The aim for this post is to highlight an alternative approach.

(more…)

Flink can recognize when you’re cheating

Friday, September 13th, 2024

aka An unnecessarily complex and silly demo of MATCH_RECOGNIZE

I play a lot of video games. That includes a lot of modern games, but I also still love going back to the retro games of my childhood. There are a lot of fun things from that era of video games that I love.

For example, cheat codes. You’d press a specific sequence of buttons on the game controller at a specific time to unlock some “secret” bit of content – like special abilities, special resources, or levels.

Some of these are so ingrained in me now that my fingers just know how to enter them without thinking. The level select cheat for Sonic the Hedgehog is the best example of this: press UP, DOWN, LEFT, RIGHT, START + A during the title screen to access a level select mode that would let you jump immediately to any part of the game.


level select cheat code for Sonic the Hedgehog

With this in the back of my head, it’s perhaps no surprise that when I needed to explain pattern recognition in Apache Flink, the metaphor I thought of first was how games of yesteryear could recognize certain button press sequences.

If you think of each button press on the game controller as an event, then recognizing a cheat code is just a pattern of events to recognize.

And once I thought of the metaphor – I had to build it. 🙂

Version 1 (virtual controllers)

architecture diagram for the demo

There is more detail on how I built this in the git repository, but this is the overall idea for what I’ve made.

(more…)