Using TensorFlow to make predictions from Kafka events

This post is a simple example of how to use a machine learning model to make predictions on a stream of events on a Kafka topic.

It’s more a quick hack than a polished project, with most of this code hacked together from samples and starter code in a single evening. But it’s a fun demo, and could be a jumping-off point for starting a more serious project.

For the purposes of a demo, I wanted to make a simple example of how to implement this pattern, using:

  • sensors that are easily and readily available, and
  • predictions that are easy to understand (and easy to generate labelled training data for)

With that goal in mind, I went with:

  • for the sensors providing the source of events, I used the accelerometer and gyroscope on my iPhone
  • to set up the Kafka broker, I used the Strimzi Kafka Operator
  • for the machine learning model, I used TensorFlow to make a simple bidirectional LSTM
  • the predictions I’m making are a description of what I’m doing with the phone (e.g. is it in my hand, is it in my pocket, etc.)

I’ve got my phone publishing a live stream of raw sensor readings, and passing that stream through an ML model to give me a live stream of events like “phone has been put on a table”, “phone has been picked up and is in my hand”, or “phone has been put in a pocket while I’m sat down”, etc.

Here is it in action. It’s a bit fiddly to demo, and a little awkward to film putting something in your pocket without filming your lap, so bear with me!

The source code is all at
github.com/dalelane/machine-learning-kafka-events.

And the steps to deploy all of it are:

$ git clone git@github.com:dalelane/machine-learning-kafka-events.git
$ cd machine-learning-kafka-events

$ kubectl create ns demo-kafka-ml
namespace/demo-kafka-ml created

$ kubectl apply -n demo-kafka-ml -f ./kafka-cluster
kafka.kafka.strimzi.io/dale created
kafkatopic.kafka.strimzi.io/sensor-topic created
kafkatopic.kafka.strimzi.io/activity-topic created

$ kubectl apply -n demo-kafka-ml -f ./kafka-ml/k8s
deployment.apps/kafka-ml created

$ kubectl apply -n demo-kafka-ml -f ./ios-sensor-data/k8s
deployment.apps/ios-sensor-data-bridge created
service/ios-sensor-data-bridge created
route.route.openshift.io/ios-sensor-data-bridge created

The steps involved in making this demo are:

  1. get sensor data from my iPhone
  2. collect labelled sensor readings for training an ML model
  3. set up a Kafka cluster and topics
  4. prepare to send sensor readings to Kafka
  5. train a machine learning model
  6. enable predictions on the stream of sensor events
  7. start the stream of sensor events
  8. consume the stream of predictions

Step 1 – getting sensor data from an iPhone

I cheated at this bit.

I found a great app called “Sensor Logger” in the iOS App Store that collects sensor data (accelerometer, gyroscope, magnetometer) and sends readings to a server.

Even better, the developer, Joe Crozier, provides the source code for a server to collect these, including an example of how to get the readings in CSV format.

So step 1 was just installing “Sensor Logger” from the App Store.

Step 2 – collect labelled sensor readings for training an ML model

I chose to collect examples of the following actions I can do with my phone:

  • idle
    • the phone is on a table. It could be face up or face down, but the point is that it’s not being held, touched or used
  • in hand
    • the phone is in my hand and is in use – it could be that I’m just looking at it, or I might be actively tapping on it
  • pocket moving
    • the phone is in my pocket, and I’m standing or walking
  • pocket sitting
    • the phone is in my pocket, and I’m sat down
  • running
    • the phone is in my hand and I’m running! (It doesn’t happen often, but it can happen sometimes.)

I started with Joe Crozier’s code, and tweaked it to make a Node.js script that collects 60 seconds of sensor readings and writes them to a CSV file.

The CSV file contains 6 numbers on each line, three values from the accelerometer, and three from the gyroscope:

accelerometer-x,accelerometer-y,accelerometer-z,gyroscope-x,gyroscope-y,gyroscope-z

Sensor readings for each type of activity are stored in separate files:
trainingdata/train-idle.csv
trainingdata/train-inhand.csv
trainingdata/train-pocketmoving.csv
trainingdata/train-pocketsitting.csv
trainingdata/train-running.csv

To set it up, I did this:

cd ios-sensor-data
npm install

Then I did this:

  • open the Sensor Logger app on my phone
  • put the IP address of my laptop into the app
  • put the port number 3000 into the app
  • changed the poll period to “Minimum”

To collect training examples for “idle“, I had to:

  1. run node train.js idle
  2. press Connect on the app
  3. press Begin Polling on the app
  4. put the phone on the table

Sensor readings are written to trainingdata/train-idle.csv. And after 60 seconds, the script terminates and the phone app disconnects.

I repeated this a few times, putting the phone in a different position each time, sometimes with the phone face up, sometimes with the phone face down. The aim was to quickly collect a variety of examples of sensor streams that represent “idle”.

I used the same approach for each of the other actions, too.

e.g.

  1. run node train.js pocketsitting
  2. press Connect on the app
  3. press Begin Polling on the app
  4. put the phone in my pocket

The train.js script ignores the first 50 sensor readings before starting to write readings to the CSV file. This gave me time to put my phone in the right place after tapping “Begin Polling”, so the sensor readings for the motion of putting my phone in the right place (e.g. my pocket) weren’t included in the training data.

And I collected several batches of 60-second readings for each activity, so I could train a model with a wide variety of examples. For example, examples of the phone in my pocket that were facing up and facing down, examples in my left pocket and my right pocket, etc.

You can see the training examples I collected in
ios-sensor-data/trainingdata.

Step 3 – set up a Kafka cluster and topics

I used the Strimzi Kafka Operator to make this simple.

The Kafka cluster I created is:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: dale
spec:
  kafka:
    listeners:
      # internal service that the kafka-ml app will use
      plain: {}
      # external route for retrieving the predictions topic
      external:
        type: route
    config:
      log.message.format.version: '2.5'
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    replicas: 3
    storage:
      type: ephemeral
    version: 2.5.0
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

And I created the two Kafka topics I need with:

#
# topic for the raw sensor readings
#
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: sensor-topic
  labels:
    strimzi.io/cluster: dale
spec:
  config:
    # keep the last 3 hours of raw sensor readings
    retention.ms: 10800000
    segment.bytes: 1073741824
  partitions: 1
  replicas: 3
  topicName: IPHONE.SENSORS
---
#
# topic for the machine learning predictions
#
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: activity-topic
  labels:
    strimzi.io/cluster: dale
spec:
  config:
    # keep the last 10 days of activity predictions
    retention.ms: 2592000000
    segment.bytes: 1073741824
  partitions: 1
  replicas: 3
  topicName: IPHONE.ACTIVITY

Step 4 – prepare to send sensor readings to Kafka

I started with Joe Crozier’s code again, and this time modified it to write to Kafka.

The source code has some more comments and explanations.

It needs two environment variables: the bootstrap address for my Kafka cluster and the name of the topic where it will produce sensor readings.

I decided to run all of this in Kubernetes, so I wrote a Deployment and put the environment variables in there. To make it easy for my phone to send events to it, I also had to write a Service.

It does run just as well on a laptop if you set the environment variables first.

$ export KAFKA_BOOTSTRAP="your-kafka-cluster:9092"
$ export RAW_EVENTS_TOPIC="IPHONE.SENSORS"
$ cd sensor-logger-server/
$ node run.js

Step 5 – train a machine learning model

I’m using the CSV files from step 2 to train a model.

The comments in kafka-ml.py explain more, but the interesting bit is:

model = Sequential([
    Bidirectional(LSTM(units, input_shape=input_shape)),

    # protect against overfitting with the relatively small
    #  training data by adding dropout
    Dropout(rate=0.5),

    # generic layer
    Dense(units, activation="relu"),

    # output layer - using the number of possible
    #  activities as the number of units
    Dense(len(labels), activation="softmax")
])
model.compile(loss="categorical_crossentropy",
              metrics=["accuracy"],
              optimizer="adam")
model.fit(
    training_x, training_y,
    epochs=25,
    # keep the stream of events in sequence
    shuffle=False,
    # don't fill the log with progress bars
    verbose=2
)

I’m using defaults and standard sample/tutorial values for almost everything here. And I didn’t do any real testing or experimentation to optimize this. It worked well enough as-is, but I’m sure there would be scope to improve it if I was to take it further than a quick hack.

Step 6 – enable predictions on the stream of sensor events

I’m using the KafkaDataset module from TensorFlow I/O. This makes it easy to feed a stream of events into the model. I’m getting them in batches, so I can use the ML model to make predictions against a series of events, rather than any one sensor reading by itself.

# take a single sensor reading (as a CSV string) and return it as
#  an array of floats
#
# value - Kafka message payload (for 1 message)
# key - Kafka message key (for 1 message)
#
# returns (value, key) where the value has been decoded
def decode_kafka_message(value, key):
    return (decode_csv(value, [[0.0] for i in range(len(columns))]), key)

# take a batch of sensor readings, and use the model to return
#  the label that it is recognized as
#
# decoded_sensor_data_batch - list (where the size is WINDOW_SIZE)
#                              of decoded messages
#
# returns string with the recognized label for the data
def get_model_inference(decoded_sensor_data_batch):
    predictions = model.predict(np.array([ decoded_sensor_data_batch ]))
    label = enc.inverse_transform(predictions)[0][0]
    return label

test_stream = KafkaDataset(topics=[RAW_EVENTS_TOPIC],
                           servers=KAFKA_BOOTSTRAP,
                           group="tensorflow-kafka",
                           # run forever, even if we stop
                           #  getting events for a while
                           eof=False,
                           # fetch message keys because they
                           #  contain timestamps
                           message_key=True,
                           config_global=[
                               "api.version.request=true"
                           ],
                           # classify new events only, ignore
                           #  historical events
                           config_topic=["auto.offset.reset=latest"])

# translate each of the string messages to an array of 6 numbers
test_stream = test_stream.map(decode_kafka_message)

# read messages into the ML model a batch at a time, using the
# same window size the ML model was trained with
test_stream = test_stream.batch(WINDOW_SIZE, drop_remainder=True)

print("Sending predictions from sensor events")
lastprediction = None

for decoded_batch, keys in test_stream:
    # get prediction
    label = get_model_inference(decoded_batch)

The rest of the source code is in kafka-ml.py.

It also needs a few environment variables: the bootstrap address for the Kafka cluster, and the names of the topics to consume from and produce to.

Again, I used a Deployment to run it in Kubernetes, but you can run it from a laptop if you set the right environment variables.

$ export KAFKA_BOOTSTRAP="your-kafka-cluster:9092"
$ export RAW_EVENTS_TOPIC="IPHONE.SENSORS"
$ export PROCESSED_EVENTS_TOPIC="IPHONE.ACTIVITY"
$ cd kafka-ml/
$ pip3 install -r requirements.txt
$ python3 kafka-ml.py

Step 7 – start the stream of sensor events

Back to the Sensor Logger app.

This time, I pointed it at the app I started in step 4.

Because I’m running it in Kubernetes, I entered the hostname for my route as the IP address, and 80 as the port number.

If you’re running it on your laptop, you can just use the IP address for your laptop and port number 3000 instead.

Step 8 – consume the stream of predictions

That’s it. It’s running.

The only thing left to do is start consuming events from the predictions topic and check that it’s working.

I needed the external bootstrap address for my Kafka cluster:

$ kubectl get kafka dale -o jsonpath='{.status.listeners[1].bootstrapServers}'
dale-kafka-bootstrap-demo-kafka-ml.apps.dalelane.cp.fyre.ibm.com

And the truststore for my Kafka cluster:

$ kubectl get secret dale-kafka-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
$ kubectl get secret dale-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password

I used that to populate a consumer.properties file:

security.protocol=SSL
ssl.truststore.type=PKCS12
ssl.truststore.location=ca.p12
ssl.truststore.password=WfFoGuN2bwIy

That was enough to start receiving the prediction events:

$ kafka-console-consumer.sh \
   --bootstrap-server dale-kafka-kafka-bootstrap-kafka-ml-demo.apps.dalelane.cp.fyre.ibm.com:443 \
   --topic IPHONE.ACTIVITY \
   --consumer.config consumer.properties

Alternatively, if you use the IBM Event Streams Kafka operator, which is based on Strimzi, you can use the Event Streams admin UI to view the stream of predictions on the IPHONE.ACTIVITY topic.

Is this a good idea?

One final comment. This is obviously not an app you’d really want to run. For many reasons.

Even if you think of a non-creepy reason why it’d be useful to predict what someone is doing, based on the motion of their phone, I think you’d do it natively on the device. Intuitively, it seems like a simple task – if the phone is flat and horizontal, it’s on a table. If it’s on it’s top or bottom edge, it’s probably in a pocket while someone is standing, if it’s on a long edge and vertical, it’s probably in a pocket while someone is sitting, etc. And it feels like this is confirmed by the fact that even a default ML model spec did this so easily with no tuning or tweaking, and only a tiny amount of training data. So I’m sure you could do this natively on the device with something like TensorFlow Lite. It would have fewer privacy concerns, would avoid the need to drain the phone battery by streaming a constant series of sensor readings to a server, and would probably come up with quicker results.

So no, I don’t think anyone would (or should!) really run this app, in this form, in the real world.

But the potentials of combining machine learning with streaming data from sensors is absolutely a real and useful thing, and something that customers I’m talking to are increasingly starting to explore. Simulated enterprise sensors are too dry for the purposes of a demo that can start a conversation. It’s nice to have an actual physical sensor you can pick up, and to be able to see a system respond to in real-time, so an iPhone is just a convenient stand-in for a stream of device events.

Tags: , , , ,

Leave a Reply