{"id":4124,"date":"2020-09-06T10:43:52","date_gmt":"2020-09-06T10:43:52","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=4124"},"modified":"2020-09-07T10:02:05","modified_gmt":"2020-09-07T10:02:05","slug":"using-tensorflow-to-make-predictions-from-kafka-events","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=4124","title":{"rendered":"Using TensorFlow to make predictions from Kafka events"},"content":{"rendered":"<p><strong>This post is a simple example of how to use a machine learning model to make predictions on a stream of events on a Kafka topic.<\/strong><\/p>\n<p>It&#8217;s more a quick hack than a polished project, with most of this code hacked together from samples and starter code in a single evening. But it&#8217;s a fun demo, and could be a jumping-off point for starting a more serious project.<\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/200905-kafka-ml\/200905-concept.png\"\/><\/p>\n<p>For the purposes of a demo, I wanted to make a simple example of how to implement this pattern, using:<\/p>\n<ul>\n<li>sensors that are easily and readily available, and<\/li>\n<li>predictions that are easy to understand (and easy to generate labelled training data for)<\/li>\n<\/ul>\n<p>With that goal in mind, I went with:<br \/>\n<img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/200905-kafka-ml\/200905-example.png\"\/><\/p>\n<ul>\n<li>for the sensors providing the source of events, I used the accelerometer and gyroscope on my iPhone<\/li>\n<li>to set up the Kafka broker, I used the <a href=\"https:\/\/strimzi.io\/\">Strimzi Kafka Operator<\/a><\/li>\n<li>for the machine learning model, I used <a href=\"https:\/\/www.tensorflow.org\/\">TensorFlow<\/a> to make a simple bidirectional LSTM<\/li>\n<li>the predictions I&#8217;m making are a description of what I&#8217;m doing with the phone (e.g. is it in my hand, is it in my pocket, etc.)<\/li>\n<\/ul>\n<p>I&#8217;ve got my phone publishing a live stream of raw sensor readings, and passing that stream through an ML model to give me a live stream of events like &#8220;phone has been put on a table&#8221;, &#8220;phone has been picked up and is in my hand&#8221;, or &#8220;phone has been put in a pocket while I&#8217;m sat down&#8221;, etc.<\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/200905-kafka-ml\/200905-demo.png\"\/><\/p>\n<p>Here is it in action. It&#8217;s a bit fiddly to demo, and a little awkward to film putting something in your pocket without filming your lap, so bear with me!<\/p>\n<p><iframe loading=\"lazy\" width=\"450\" height=\"253\" src=\"https:\/\/www.youtube.com\/embed\/U8KgcQoKri8\" frameborder=\"0\" allow=\"accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture\" allowfullscreen=\"\"><\/iframe><\/p>\n<p>The source code is all at<br \/>\n<a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\">github.com\/dalelane\/machine-learning-kafka-events<\/a>.<\/p>\n<p><!--more-->And the <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/HOW-TO-RUN.md\">steps to deploy all of it<\/a> are:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">$ git clone git@github.com:dalelane\/machine-learning-kafka-events.git\n$ cd machine-learning-kafka-events\n\n$ kubectl create ns demo-kafka-ml\nnamespace\/demo-kafka-ml created\n\n$ kubectl apply -n demo-kafka-ml -f .\/kafka-cluster\nkafka.kafka.strimzi.io\/dale created\nkafkatopic.kafka.strimzi.io\/sensor-topic created\nkafkatopic.kafka.strimzi.io\/activity-topic created\n\n$ kubectl apply -n demo-kafka-ml -f .\/kafka-ml\/k8s\ndeployment.apps\/kafka-ml created\n\n$ kubectl apply -n demo-kafka-ml -f .\/ios-sensor-data\/k8s\ndeployment.apps\/ios-sensor-data-bridge created\nservice\/ios-sensor-data-bridge created\nroute.route.openshift.io\/ios-sensor-data-bridge created<\/pre>\n<p>The steps involved in making this demo are:<\/p>\n<ol>\n<li><a href=\"#step1\">get sensor data from my iPhone<\/a><\/li>\n<li><a href=\"#step2\">collect labelled sensor readings for training an ML model<\/a><\/li>\n<li><a href=\"#step3\">set up a Kafka cluster and topics<\/a><\/li>\n<li><a href=\"#step4\">prepare to send sensor readings to Kafka<\/a><\/li>\n<li><a href=\"#step5\">train a machine learning model<\/a><\/li>\n<li><a href=\"#step6\">enable predictions on the stream of sensor events<\/a><\/li>\n<li><a href=\"#step7\">start the stream of sensor events<\/a><\/li>\n<li><a href=\"#step8\">consume the stream of predictions<\/a><\/li>\n<\/ol>\n<p><a name=\"step1\"><\/a><\/p>\n<h3>Step 1 &#8211; getting sensor data from an iPhone<\/h3>\n<p>I cheated at this bit.<\/p>\n<p>I found a great app called &#8220;<a href=\"https:\/\/apps.apple.com\/ca\/app\/sensor-logger-stream-sensor-data-to-your-server\/id1200504481\">Sensor Logger<\/a>&#8221; in the iOS App Store that collects sensor data (accelerometer, gyroscope, magnetometer) and sends readings to a server.<\/p>\n<p>Even better, the developer, <a href=\"https:\/\/joecrozier.ca\/\">Joe Crozier<\/a>, provides the <a href=\"https:\/\/github.com\/joenot443\/sensor-logger-server\">source code for a server to collect these<\/a>, including an example of how to get the readings in CSV format.<\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/200905-kafka-ml\/200905-app.png\"\/><\/p>\n<p>So step 1 was just installing &#8220;Sensor Logger&#8221; from the App Store.<\/p>\n<p><a name=\"step2\"><\/a><\/p>\n<h3>Step 2 &#8211; collect labelled sensor readings for training an ML model<\/h3>\n<p>I chose to collect examples of the following actions I can do with my phone:<\/p>\n<ul>\n<li>&#8220;<strong>idle<\/strong>&#8221;\n<ul>\n<li>the phone is on a table. It could be face up or face down, but the point is that it&#8217;s not being held, touched or used<\/li>\n<\/ul>\n<\/li>\n<li>&#8220;<strong>in hand<\/strong>&#8221;\n<ul>\n<li>the phone is in my hand and is in use &#8211; it could be that I&#8217;m just looking at it, or I might be actively tapping on it<\/li>\n<\/ul>\n<\/li>\n<li>&#8220;<strong>pocket moving<\/strong>&#8221;\n<ul>\n<li>the phone is in my pocket, and I&#8217;m standing or walking<\/li>\n<\/ul>\n<\/li>\n<li>&#8220;<strong>pocket sitting<\/strong>&#8221;\n<ul>\n<li>the phone is in my pocket, and I&#8217;m sat down<\/li>\n<\/ul>\n<\/li>\n<li>&#8220;<strong>running<\/strong>&#8221;\n<ul>\n<li>the phone is in my hand and I&#8217;m running! (<em>It doesn&#8217;t happen often, but it can happen sometimes.<\/em>)<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p>I started with <a href=\"https:\/\/github.com\/joenot443\/sensor-logger-server\">Joe Crozier&#8217;s code<\/a>, and tweaked it to make <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/train.js\">a Node.js script<\/a> that collects 60 seconds of sensor readings and writes them to a CSV file.<\/p>\n<p>The CSV file contains 6 numbers on each line, three values from the accelerometer, and three from the gyroscope:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">accelerometer-x,accelerometer-y,accelerometer-z,gyroscope-x,gyroscope-y,gyroscope-z<\/pre>\n<p>Sensor readings for each type of activity are stored in separate files:<br \/>\n<code><a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/trainingdata\/train-idle.csv\">trainingdata\/train-idle.csv<\/a><\/code><br \/>\n<code><a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/trainingdata\/train-inhand.csv\">trainingdata\/train-inhand.csv<\/a><\/code><br \/>\n<code><a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/trainingdata\/train-pocketmoving.csv\">trainingdata\/train-pocketmoving.csv<\/a><\/code><br \/>\n<code><a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/trainingdata\/train-pocketsitting.csv\">trainingdata\/train-pocketsitting.csv<\/a><\/code><br \/>\n<code><a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/trainingdata\/train-running.csv\">trainingdata\/train-running.csv<\/a><\/code><\/p>\n<p>To set it up, I did this:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">cd ios-sensor-data\nnpm install<\/pre>\n<p>Then I did this:<\/p>\n<ul>\n<li>open the Sensor Logger app on my phone<\/li>\n<li>put the IP address of my laptop into the app<\/li>\n<li>put the port number <code>3000<\/code> into the app<\/li>\n<li>changed the poll period to &#8220;Minimum&#8221;<\/li>\n<\/ul>\n<p>To collect training examples for &#8220;<strong>idle<\/strong>&#8220;, I had to:<\/p>\n<ol>\n<li>run <code>node train.js idle<\/code><\/li>\n<li>press <strong>Connect<\/strong> on the app<\/li>\n<li>press <strong>Begin Polling<\/strong> on the app<\/li>\n<li>put the phone on the table<\/li>\n<\/ol>\n<p>Sensor readings are written to <code>trainingdata\/train-idle.csv<\/code>. And after 60 seconds, the script terminates and the phone app disconnects.<\/p>\n<p>I repeated this a few times, putting the phone in a different position each time, sometimes with the phone face up, sometimes with the phone face down. The aim was to quickly collect a variety of examples of sensor streams that represent &#8220;idle&#8221;.<\/p>\n<p>I used the same approach for each of the other actions, too.<\/p>\n<p>e.g.<\/p>\n<ol>\n<li>run <code>node train.js pocketsitting<\/code><\/li>\n<li>press <strong>Connect<\/strong> on the app<\/li>\n<li>press <strong>Begin Polling<\/strong> on the app<\/li>\n<li>put the phone in my pocket<\/li>\n<\/ol>\n<p>The <code><a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/train.js\">train.js<\/a><\/code> script ignores the first 50 sensor readings before starting to write readings to the CSV file. This gave me time to put my phone in the right place after tapping &#8220;Begin Polling&#8221;, so the sensor readings for the motion of putting my phone in the right place (e.g. my pocket) weren&#8217;t included in the training data.<\/p>\n<p>And I collected several batches of 60-second readings for each activity, so I could train a model with a wide variety of examples. For example, examples of the phone in my pocket that were facing up and facing down, examples in my left pocket and my right pocket, etc.<\/p>\n<p>You can see the training examples I collected in<br \/>\n<a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/tree\/master\/ios-sensor-data\/trainingdata\">ios-sensor-data\/trainingdata<\/a>.<\/p>\n<p><a name=\"step3\"><\/a><\/p>\n<h3>Step 3 &#8211; set up a Kafka cluster and topics<\/h3>\n<p>I used the <a href=\"https:\/\/strimzi.io\/\">Strimzi Kafka Operator<\/a> to make this simple.<\/p>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/kafka-cluster\/kafka.yaml\">Kafka cluster I created<\/a> is:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">apiVersion: kafka.strimzi.io\/v1beta1\nkind: Kafka\nmetadata:\n  name: dale\nspec:\n  kafka:\n    listeners:\n      # internal service that the kafka-ml app will use\n      plain: {}\n      # external route for retrieving the predictions topic\n      external:\n        type: route\n    config:\n      log.message.format.version: '2.5'\n      offsets.topic.replication.factor: 3\n      transaction.state.log.min.isr: 2\n      transaction.state.log.replication.factor: 3\n    replicas: 3\n    storage:\n      type: ephemeral\n    version: 2.5.0\n  zookeeper:\n    replicas: 3\n    storage:\n      type: ephemeral\n  entityOperator:\n    topicOperator: {}\n    userOperator: {}<\/pre>\n<p>And I created <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/kafka-cluster\/topics.yaml\">the two Kafka topics I need<\/a> with:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">#\n# topic for the raw sensor readings\n#\napiVersion: kafka.strimzi.io\/v1beta1\nkind: KafkaTopic\nmetadata:\n  name: sensor-topic\n  labels:\n    strimzi.io\/cluster: dale\nspec:\n  config:\n    # keep the last 3 hours of raw sensor readings\n    retention.ms: 10800000\n    segment.bytes: 1073741824\n  partitions: 1\n  replicas: 3\n  topicName: IPHONE.SENSORS\n---\n#\n# topic for the machine learning predictions\n#\napiVersion: kafka.strimzi.io\/v1beta1\nkind: KafkaTopic\nmetadata:\n  name: activity-topic\n  labels:\n    strimzi.io\/cluster: dale\nspec:\n  config:\n    # keep the last 10 days of activity predictions\n    retention.ms: 2592000000\n    segment.bytes: 1073741824\n  partitions: 1\n  replicas: 3\n  topicName: IPHONE.ACTIVITY<\/pre>\n<p><a name=\"step4\"><\/a><\/p>\n<h3>Step 4 &#8211; prepare to send sensor readings to Kafka<\/h3>\n<p>I started with <a href=\"https:\/\/github.com\/joenot443\/sensor-logger-server\">Joe Crozier&#8217;s code<\/a> again, and this time <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/run.js\">modified it to write to Kafka<\/a>.<\/p>\n<p>The <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/run.js\">source code<\/a> has some more comments and explanations.<\/p>\n<p>It needs two environment variables: the bootstrap address for my Kafka cluster and the name of the topic where it will produce sensor readings.<\/p>\n<p>I decided to <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/HOW-TO-RUN.md\">run all of this in Kubernetes<\/a>, so <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/k8s\/deployment.yaml\">I wrote a Deployment<\/a> and put the environment variables in there. To make it easy for my phone to send events to it, I also had to <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/k8s\/service.yaml\">write a Service<\/a>.<\/p>\n<p>It does run just as well on a laptop if you set the environment variables first.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">$ export KAFKA_BOOTSTRAP=\"your-kafka-cluster:9092\"\n$ export RAW_EVENTS_TOPIC=\"IPHONE.SENSORS\"\n$ cd sensor-logger-server\/\n$ node run.js<\/pre>\n<p><a name=\"step5\"><\/a><\/p>\n<h3>Step 5 &#8211; train a machine learning model<\/h3>\n<p>I&#8217;m using the CSV files from <a href=\"#step2\">step 2<\/a> to train a model.<\/p>\n<p>The comments in <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/kafka-ml\/kafka-ml.py\">kafka-ml.py<\/a> explain more, but the interesting bit is:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">model = Sequential([\n    Bidirectional(LSTM(units, input_shape=input_shape)),\n\n    # protect against overfitting with the relatively small\n    #  training data by adding dropout\n    Dropout(rate=0.5),\n\n    # generic layer\n    Dense(units, activation=\"relu\"),\n\n    # output layer - using the number of possible\n    #  activities as the number of units\n    Dense(len(labels), activation=\"softmax\")\n])\nmodel.compile(loss=\"categorical_crossentropy\",\n              metrics=[\"accuracy\"],\n              optimizer=\"adam\")\nmodel.fit(\n    training_x, training_y,\n    epochs=25,\n    # keep the stream of events in sequence\n    shuffle=False,\n    # don't fill the log with progress bars\n    verbose=2\n)<\/pre>\n<p>I&#8217;m using defaults and standard sample\/tutorial values for almost everything here. And I didn&#8217;t do any real testing or experimentation to optimize this. It worked well enough as-is, but I&#8217;m sure there would be scope to improve it if I was to take it further than a quick hack.<\/p>\n<p><a name=\"step6\"><\/a><\/p>\n<h3>Step 6 &#8211; enable predictions on the stream of sensor events<\/h3>\n<p>I&#8217;m using the <a href=\"https:\/\/www.tensorflow.org\/io\/api_docs\/python\/tfio\/kafka\/KafkaDataset\">KafkaDataset<\/a> module from <a href=\"https:\/\/www.tensorflow.org\/io\">TensorFlow I\/O<\/a>. This makes it easy to feed a stream of events into the model. I&#8217;m getting them in batches, so I can use the ML model to make predictions against a series of events, rather than any one sensor reading by itself.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\"># take a single sensor reading (as a CSV string) and return it as\n#  an array of floats\n#\n# value - Kafka message payload (for 1 message)\n# key - Kafka message key (for 1 message)\n#\n# returns (value, key) where the value has been decoded\ndef decode_kafka_message(value, key):\n    return (decode_csv(value, [[0.0] for i in range(len(columns))]), key)\n\n# take a batch of sensor readings, and use the model to return\n#  the label that it is recognized as\n#\n# decoded_sensor_data_batch - list (where the size is WINDOW_SIZE)\n#                              of decoded messages\n#\n# returns string with the recognized label for the data\ndef get_model_inference(decoded_sensor_data_batch):\n    predictions = model.predict(np.array([ decoded_sensor_data_batch ]))\n    label = enc.inverse_transform(predictions)[0][0]\n    return label\n\ntest_stream = KafkaDataset(topics=[RAW_EVENTS_TOPIC],\n                           servers=KAFKA_BOOTSTRAP,\n                           group=\"tensorflow-kafka\",\n                           # run forever, even if we stop\n                           #  getting events for a while\n                           eof=False,\n                           # fetch message keys because they\n                           #  contain timestamps\n                           message_key=True,\n                           config_global=[\n                               \"api.version.request=true\"\n                           ],\n                           # classify new events only, ignore\n                           #  historical events\n                           config_topic=[\"auto.offset.reset=latest\"])\n\n# translate each of the string messages to an array of 6 numbers\ntest_stream = test_stream.map(decode_kafka_message)\n\n# read messages into the ML model a batch at a time, using the\n# same window size the ML model was trained with\ntest_stream = test_stream.batch(WINDOW_SIZE, drop_remainder=True)\n\nprint(\"Sending predictions from sensor events\")\nlastprediction = None\n\nfor decoded_batch, keys in test_stream:\n    # get prediction\n    label = get_model_inference(decoded_batch)<\/pre>\n<p>The rest of the source code is in <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/kafka-ml\/kafka-ml.py\">kafka-ml.py<\/a>.<\/p>\n<p>It also needs a few environment variables: the bootstrap address for the Kafka cluster, and the names of the topics to consume from and produce to.<\/p>\n<p>Again, I <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/kafka-ml\/k8s\/deployment.yaml\">used a Deployment<\/a> to <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/HOW-TO-RUN.md\">run it in Kubernetes<\/a>, but you can run it from a laptop if you set the right environment variables.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">$ export KAFKA_BOOTSTRAP=\"your-kafka-cluster:9092\"\n$ export RAW_EVENTS_TOPIC=\"IPHONE.SENSORS\"\n$ export PROCESSED_EVENTS_TOPIC=\"IPHONE.ACTIVITY\"\n$ cd kafka-ml\/\n$ pip3 install -r requirements.txt\n$ python3 kafka-ml.py<\/pre>\n<p><a name=\"step7\"><\/a><\/p>\n<h3>Step 7 &#8211; start the stream of sensor events<\/h3>\n<p>Back to the <a href=\"https:\/\/apps.apple.com\/ca\/app\/sensor-logger-stream-sensor-data-to-your-server\/id1200504481\">Sensor Logger<\/a> app.<\/p>\n<p>This time, I pointed it at the app I started in <a href=\"#step4\">step 4<\/a>.<\/p>\n<p>Because I&#8217;m running it in Kubernetes, I entered the hostname for <a href=\"https:\/\/github.com\/dalelane\/machine-learning-kafka-events\/blob\/master\/ios-sensor-data\/k8s\/service.yaml#L14\">my route<\/a> as the IP address, and <code>80<\/code> as the port number.<\/p>\n<p>If you&#8217;re running it on your laptop, you can just use the IP address for your laptop and port number <code>3000<\/code> instead.<\/p>\n<p><a name=\"step8\"><\/a><\/p>\n<h3>Step 8 &#8211; consume the stream of predictions<\/h3>\n<p>That&#8217;s it. It&#8217;s running.<\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/200905-kafka-ml\/200905-demo.png\"\/><\/p>\n<p>The only thing left to do is start consuming events from the predictions topic and check that it&#8217;s working.<\/p>\n<p>I needed the external bootstrap address for my Kafka cluster:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">$ kubectl get kafka dale -o jsonpath='{.status.listeners[1].bootstrapServers}'\ndale-kafka-bootstrap-demo-kafka-ml.apps.dalelane.cp.fyre.ibm.com<\/pre>\n<p>And the truststore for my Kafka cluster:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">$ kubectl get secret dale-kafka-cluster-ca-cert -o jsonpath='{.data.ca\\.p12}' | base64 -d &gt; ca.p12\n$ kubectl get secret dale-cluster-ca-cert -o jsonpath='{.data.ca\\.password}' | base64 -d &gt; ca.password<\/pre>\n<p>I used that to populate a <code>consumer.properties<\/code> file:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">security.protocol=SSL\nssl.truststore.type=PKCS12\nssl.truststore.location=ca.p12\nssl.truststore.password=WfFoGuN2bwIy<\/pre>\n<p>That was enough to start receiving the prediction events:<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.7em; overflow-x: scroll; font-size: 1.1em;\">$ kafka-console-consumer.sh \\\n   --bootstrap-server dale-kafka-kafka-bootstrap-kafka-ml-demo.apps.dalelane.cp.fyre.ibm.com:443 \\\n   --topic IPHONE.ACTIVITY \\\n   --consumer.config consumer.properties<\/pre>\n<p>Alternatively, if you use the <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=4066\">IBM Event Streams<\/a> Kafka operator, which is based on Strimzi, you can use the Event Streams admin UI to view the stream of predictions on the IPHONE.ACTIVITY topic.<\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/200905-kafka-ml\/200907-eventstreams.png\" style=\"border: thin black solid\"\/><\/p>\n<h2>Is this a good idea?<\/h2>\n<p>One final comment. This is obviously not an app you&#8217;d really want to run. For many reasons.<\/p>\n<p>Even if you think of a non-creepy reason why it&#8217;d be useful to predict what someone is doing, based on the motion of their phone, I think you&#8217;d do it natively on the device. Intuitively, it seems like a simple task &#8211; if the phone is flat and horizontal, it&#8217;s on a table. If it&#8217;s on it&#8217;s top or bottom edge, it&#8217;s probably in a pocket while someone is standing, if it&#8217;s on a long edge and vertical, it&#8217;s probably in a pocket while someone is sitting, etc. And it feels like this is confirmed by the fact that even a default ML model spec did this so easily with no tuning or tweaking, and only a tiny amount of training data. So I&#8217;m sure you could do this natively on the device with something like TensorFlow Lite. It would have fewer privacy concerns, would avoid the need to drain the phone battery by streaming a constant series of sensor readings to a server, and would probably come up with quicker results.<\/p>\n<p>So no, I don&#8217;t think anyone would (or should!) really run this app, in this form, in the real world.<\/p>\n<p><strong>But<\/strong> the potentials of combining machine learning with streaming data from sensors is absolutely a real and useful thing, and something that customers I&#8217;m talking to are increasingly starting to explore. Simulated enterprise sensors are too dry for the purposes of a demo that can start a conversation. It&#8217;s nice to have an actual physical sensor you can pick up, and to be able to see a system respond to in real-time, so an iPhone is just a convenient stand-in for a stream of device events.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>This post is a simple example of how to use a machine learning model to make predictions on a stream of events on a Kafka topic. It&#8217;s more a quick hack than a polished project, with most of this code hacked together from samples and starter code in a single evening. But it&#8217;s a fun [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":4127,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,584,580,601,600],"class_list":["post-4124","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-kafka","tag-machine-learning","tag-strimzi","tag-tensorflow"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/4124","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=4124"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/4124\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/4127"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=4124"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=4124"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=4124"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}