Flink can recognize when you’re cheating

aka An unnecessarily complex and silly demo of MATCH_RECOGNIZE

I play a lot of video games. That includes a lot of modern games, but I also still love going back to the retro games of my childhood. There are a lot of fun things from that era of video games that I love.

For example, cheat codes. You’d press a specific sequence of buttons on the game controller at a specific time to unlock some “secret” bit of content – like special abilities, special resources, or levels.

Some of these are so ingrained in me now that my fingers just know how to enter them without thinking. The level select cheat for Sonic the Hedgehog is the best example of this: press UP, DOWN, LEFT, RIGHT, START + A during the title screen to access a level select mode that would let you jump immediately to any part of the game.


level select cheat code for Sonic the Hedgehog

With this in the back of my head, it’s perhaps no surprise that when I needed to explain pattern recognition in Apache Flink, the metaphor I thought of first was how games of yesteryear could recognize certain button press sequences.

If you think of each button press on the game controller as an event, then recognizing a cheat code is just a pattern of events to recognize.

And once I thought of the metaphor – I had to build it. 🙂

Version 1 (virtual controllers)

architecture diagram for the demo

There is more detail on how I built this in the git repository, but this is the overall idea for what I’ve made.

(1) – Virtual on-screen controllers emit events to a Kafka topic every time a button is pressed.

(2) – A Flink job consumes the events from the BUTTONPRESSES topic and looks for a sequence of button presses from any user that matches one of predefined patterns. When it recognizes a pattern, it emits an event to the CHEATS topic.

(3) – Each user subscribes to receive a notification when their cheat was recognized.

If you don’t recognise the “Konami” references in the topic names, then… well, you probably stopped reading several paragraphs ago, but just in case any non-gamers have stuck with me, it’s a reference to the iconic “Konami Code”.

This is what it looks like in action:


screen recording of the demo

How can Flink do this?

I’m using Flink SQL as it’s a simple way to do pattern recognition that is easy to write (and perhaps more importantly, easy to read!).

Getting the button presses

The input for the Flink job is a Kafka topic with very simple JSON messages. Each button press, from any user, is a separate message, all going onto the same topic.

The user is identified both in the record key, and a property in the payload.

screenshot of the input topic

The message payloads look like:

{
    "user": "dale",
    "buttons": "B"
}

If someone presses multiple buttons at once, the message looks like:

{
    "user": "nic",
    "buttons": "START+A"
}

These are consumed into Flink like this:

CREATE TABLE `buttonpresses`
(
    `user`        STRING,
    `buttons`     STRING,
    `event_time`  TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR `event_time` AS `event_time`
)
WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'topic' = 'KONAMI.BUTTONPRESSES',
    ...
);

There are more properties relating to the connection config for the Kafka cluster, but that was the interesting bit.

Next, I defined the patterns that the Flink job should recognize – one for each cheat code.

Sonic the Hedgehog

First, the Sonic the Hedgehog level select cheat that inspired this ridiculous demo.

I defined the different button codes to watch out for, and then define the pattern as UP, DOWN, LEFT, RIGHT, START+A

CREATE
    TEMPORARY VIEW `sonic` AS
SELECT
    `cheat_time`,
    `user`,
    CAST('sonic-level-select' AS STRING) AS `cheat`,
    CONCAT('/konami/cheats/', `user`) AS `target`
FROM buttonpresses
    MATCH_RECOGNIZE (
        PARTITION BY `user`
        ORDER BY event_time
        MEASURES
            PRESS_STARTANDA.event_time AS cheat_time
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (
            PRESS_UP PRESS_DOWN PRESS_LEFT PRESS_RIGHT
            PRESS_STARTANDA
        )
        DEFINE
            PRESS_UP AS buttons = 'UP',
            PRESS_DOWN AS buttons = 'DOWN',
            PRESS_LEFT AS buttons = 'LEFT',
            PRESS_RIGHT AS buttons = 'RIGHT',
            PRESS_STARTANDA AS buttons = 'START+A'
    );

Mortal Kombat

Next, the cheat code that inspired so much media hysteria when I was a kid – the Mortal Kombat blood code.

I defined the different button codes to watch out for, and then define the pattern as A, B, A, C, A, B, B

Notice that I can’t use a defined event more than once in the pattern, so I worked around this by defining B press events as “PRESS_B1”, “PRESS_B2”, and “PRESS_B3”.

CREATE
    TEMPORARY VIEW `mortalkombat` AS
SELECT
    `cheat_time`,
    `user`,
    CAST('mortalkombat-blood-code' AS STRING) AS `cheat`,
    CONCAT('/konami/cheats/', `user`) AS `target`
FROM buttonpresses
    MATCH_RECOGNIZE (
        PARTITION BY `user`
        ORDER BY event_time
        MEASURES
            PRESS_B3.event_time AS cheat_time
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (
            PRESS_A1 PRESS_B1 PRESS_A2 PRESS_C PRESS_A3 PRESS_B2 PRESS_B3
        )
        DEFINE
            PRESS_A1 AS buttons = 'A',
            PRESS_A2 AS buttons = 'A',
            PRESS_A3 AS buttons = 'A',
            PRESS_B1 AS buttons = 'B',
            PRESS_B2 AS buttons = 'B',
            PRESS_B3 AS buttons = 'B',
            PRESS_C  AS buttons = 'C'
    );

Contra

Finally, the iconic Konami code used to get an extra 30 lives in Contra.

Notice that I can recognize repeated events, such as ↑ ↑, by using a quantifier PRESS_UP{2}.

Notice the way that all of these include PARTITION BY `user` so that with button press events coming from multiple controllers (“users”) at the same time, Flink will recognize a cheat code entered by a particular controller – where different users are entering different codes concurrently.

CREATE
    TEMPORARY VIEW `contra` AS
SELECT
    `cheat_time`,
    `user`,
    CAST('contra-30-lives' AS STRING) AS `cheat`,
    CONCAT('/konami/cheats/', `user`) AS `target`
FROM buttonpresses
    MATCH_RECOGNIZE (
        PARTITION BY `user`
        ORDER BY event_time
        MEASURES
            PRESS_A.event_time AS cheat_time
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (
            PRESS_UP{2}
            PRESS_DOWN{2}
            PRESS_LEFT1 PRESS_RIGHT1
            PRESS_LEFT2 PRESS_RIGHT2
            PRESS_B PRESS_A
        )
        DEFINE
            PRESS_UP AS buttons = 'UP',
            PRESS_DOWN AS buttons = 'DOWN',
            PRESS_LEFT1 AS buttons = 'LEFT',
            PRESS_LEFT2 AS buttons = 'LEFT',
            PRESS_RIGHT1 AS buttons = 'RIGHT',
            PRESS_RIGHT2 AS buttons = 'RIGHT',
            PRESS_A AS buttons = 'A',
            PRESS_B AS buttons = 'B'
    );

Emitting the results

Finally, the last thing to do is to get Flink to output events from any of the MATCH_RECOGNIZE expressions to the destination topic.

CREATE TABLE `output`
(
    `cheat_time`  TIMESTAMP(3) METADATA FROM 'timestamp',
    `user`        STRING,
    `cheat`       STRING,
    `target`      STRING
)
WITH (
    'connector' = 'kafka',
    'key.format' = 'raw',
    'key.fields' = 'target',
    'value.format' = 'json',
    'value.fields-include' = 'EXCEPT_KEY',
    'topic' = 'KONAMI.CHEATS',
    ...
);

INSERT INTO `output`
    SELECT * FROM `sonic`
        UNION ALL
    SELECT * FROM `mortalkombat`
        UNION ALL
    SELECT * FROM `contra`;

This gets the recognized cheat codes to the output Kafka topic.

screenshot of the output topic

The output messages are simple JSON payloads that look like this:

{
    "user": "dale",
    "cheat": "contra-30-lives"
}

The serious(ish) point

One of the key values that Flink brings is that it allows us to process events without needing to look at each individual event in isolation, but rather to look at events in the context of other events that happen before and after it.

Naturally, I’m not proposing sending button press events to a remote Kafka topic for analysis in Flink. I’m not saying this is a sensible way to recognize cheat codes. But it is a fun and simple way to illustrate and explain MATCH_RECOGNIZE.

In this demo, I’m looking for a certain sequence of button presses, but the general principle of wanting to recognize when a pattern of events happens has a variety of real world uses, such as recognizing customer behaviours in retail, problematic sensor readings in manufacturing, or fraudulent financial transaction events in banking.

Version 2 (real controllers)

Obviously, the next step is to do this using real game controllers 😉

Tags: , ,

Leave a Reply