{"id":3924,"date":"2019-10-31T02:58:01","date_gmt":"2019-10-31T02:58:01","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=3924"},"modified":"2019-10-31T09:38:58","modified_gmt":"2019-10-31T09:38:58","slug":"using-tensorflow-with-ibm-event-streams-kafka-machine-learning-awesome","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=3924","title":{"rendered":"Using TensorFlow with IBM Event Streams <br>(Kafka + Machine Learning = Awesome)"},"content":{"rendered":"<p><strong>In this post, I want to explain how to get started creating machine learning applications using the data you have on Kafka topics.<\/strong><\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/wp-content\/uploads\/2019\/10\/191030-kafka-ml-3.png\" style=\"border: thin black solid\"\/><\/p>\n<p>I&#8217;ve written <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\">a sample app<\/a>, with examples of how you can use Kafka topics as:<\/p>\n<ul>\n<li> a source of training data for creating machine learning models<\/li>\n<li> a source of test data for evaluating machine learning models<\/li>\n<li> an ongoing stream of events to make predictions about using machine learning models<\/li>\n<\/ul>\n<p>I&#8217;ll use this post to explain how it works, and how you can use it as the basis of writing your first ML pipeline using the data on your own Kafka topics.<\/p>\n<p><!--more--><\/p>\n<h2>Overview<\/h2>\n<p>There are a variety of ways to do this, but for this post I&#8217;ll be starting with perhaps the simplest, aiming for something a bit like this:<\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982711886\/in\/datetaken-public\/\" title=\"191030-kafka-ml-1\"><img loading=\"lazy\" decoding=\"async\" style=\"border: thin black solid\" src=\"https:\/\/live.staticflickr.com\/65535\/48982711886_d8b2a807da.jpg\" width=\"450\" height=\"158\" alt=\"191030-kafka-ml-1\"\/><\/a><br \/>\n<script async=\"\" src=\"\/\/embedr.flickr.com\/assets\/client-code.js\" charset=\"utf-8\"><\/script><\/p>\n<p><strong>(a) Get a source of events that you want to use as training data for your machine learning model.<\/strong><\/p>\n<p>I won&#8217;t be discussing this step for this post, but I&#8217;m assuming that you&#8217;ll use something like Kafka Connect or some custom client to get some interesting data into a Kafka topic.<\/p>\n<p><strong>(b) Prepare the source of events for use training your model<\/strong><\/p>\n<p>The specifics will depend on your domain, use-case and data, but I&#8217;m assuming that you&#8217;ll use something like Kafka Streams to cleanse, filter, or otherwise pre-process your training data.<\/p>\n<p><strong>(c) Train a machine learning model using the events on your training Kafka topic<\/strong><\/p>\n<p>For this post, I&#8217;m using TensorFlow with Keras, using some of the new APIs from <a href=\"https:\/\/github.com\/tensorflow\/io\/\">TensorFlow I\/O<\/a> for the integration with Kafka.<\/p>\n<p><strong>(d) Try out your machine learning model to check it&#8217;s working<\/strong><\/p>\n<p><strong>(e) Evaluate your machine learning model, using a second topic as a source of test data<\/strong><\/p>\n<p>Now you have a machine learning model, that you&#8217;re happy with the performance of, and you&#8217;re ready to start using it.<\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982901922\/in\/datetaken-public\/\" title=\"191030-kafka-ml-2\"><img loading=\"lazy\" decoding=\"async\" style=\"border: thin black solid\" src=\"https:\/\/live.staticflickr.com\/65535\/48982901922_5da33554fa.jpg\" width=\"450\" height=\"203\" alt=\"191030-kafka-ml-2\"\/><\/a><\/p>\n<p><strong>(f) Get a stream of events that you want to use your ML model to make predictions about <\/strong><\/p>\n<p>Again, the specifics will depend on your use case, but it may well be a combination of Kafka Connect and Kafka Streams components as with the training data.<\/p>\n<p><strong>(g) Use your ML model to make predictions against a stream of events<\/strong><\/p>\n<p>Unlike with the test topic before in <strong>(e)<\/strong>, this isn&#8217;t a one-off batch test against a topic that already has the data ready on it.<\/p>\n<p>This is a live stream of events, with your ML model used to make predictions based on each event as they arrive.<\/p>\n<p><strong>(h) Do something!<\/strong><\/p>\n<p>Take an action, make a notification&#8230; do something with the prediction your ML model made!<\/p>\n<h2>A demo<\/h2>\n<p>To show how to do this for real, I&#8217;ll be using a simplified demo Python application written using <a href=\"https:\/\/www.tensorflow.org\/\">TensorFlow<\/a> with <a href=\"https:\/\/www.ibm.com\/cloud\/event-streams\">IBM Event Streams<\/a>.<\/p>\n<p>I&#8217;ve put the complete working demo on GitHub as <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\">event-streams-tensorflow<\/a>.<\/p>\n<p>(You just need to edit the <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/config.env\"><code>config.env<\/code><\/a> file with the location of your own cluster, then it should be ready to run. To reset everything to start from the beginning again, just run <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/run-cleanup-99.sh\"><code>run-cleanup-99.sh<\/code><\/a>).<\/p>\n<p>In this post, I&#8217;ll explain what it all does.<\/p>\n<p>I&#8217;ll copy some of the more interesting snippets below, with links to where you can find it in the context of the full working app.<\/p>\n<h3>Step 0<br \/>\nPreparing Kafka topics with data for an ML project<\/h3>\n<p><strong><em>To see this step running, run the <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/run-step-0.sh\">run-step-0.sh<\/a><\/code> script.<\/em><\/strong><br \/>\n<strong><em>For an example of the output, see <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/sample-output\/step-0.txt\">sample-output\/step-0.txt<\/a><\/code><\/em><\/strong><\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982712016\/in\/datetaken-public\/\" title=\"191030-kafka-ml-3-0\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/live.staticflickr.com\/65535\/48982712016_5a80e97b60.jpg\" width=\"450\" height=\"248\" style=\"border: thin black solid\" alt=\"191030-kafka-ml-3-0\"\/><\/a><\/p>\n<p>I&#8217;m calling this &#8220;Step 0&#8221; because the ideal scenario here is that you already have a Kafka topic with a useful stream of events, and you&#8217;re thinking about how to get further value from it through machine learning. This would mean you don&#8217;t have anything to do for this step, but for the purposes of this post, I need a topic with some data on it that I can use as the basis of a demo.<\/p>\n<p>I&#8217;m using the <a href=\"https:\/\/research.zalando.com\/welcome\/mission\/research-projects\/fashion-mnist\/\">Fashion MNIST dataset<\/a> from Zalando.<\/p>\n<p>If you&#8217;re not familiar with it, it&#8217;s a set of 70,000 images &#8211; each one a photo of a fashion item (e.g. a t-shirt, or coat, or boot). The images are all already pre-processed, available as 28&#215;28 pixel grayscale images and labelled as one of ten types of fashion item.<\/p>\n<p><img decoding=\"async\" src=\"\/\/dalelane.co.uk\/blog\/post-images\/191030-kafka-mnist.png\" style=\"border: thin black solid\"\/><\/p>\n<p>I&#8217;ve chosen this because it&#8217;s a very commonly-used dataset for machine learning projects, with hundreds of tutorials based on it, and articles with a decent set of hyperparameters available for use as a starting point.<\/p>\n<p>By taking the neural net specification for tackling the fashion MNIST dataset as a given it means I can concentrate on the Kafka-specific angle for this post, rather than try to cover too many things in one go.<\/p>\n<p>This means that, for this post, step 0 is going to mean getting the training and test data from the fashion MNIST dataset, and loading them onto two different Kafka topics.<\/p>\n<p>With such a well-known dataset, it perhaps wasn&#8217;t essential for me to use a schema for the data, but it&#8217;s a good idea to base your ML pipeline on well-known predictable data formats, so I&#8217;ll start by <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=3781\">defining my Avro schema<\/a>.<\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48988741036\/in\/datetaken-public\/\" title=\"191030-kafka-es-2\"><img loading=\"lazy\" decoding=\"async\" style=\"border: thin black solid\" src=\"https:\/\/live.staticflickr.com\/65535\/48988741036_934891a17f.jpg\" width=\"450\" height=\"249\" alt=\"191030-kafka-es-2\"\/><\/a><\/p>\n<p>You can find the schema that I&#8217;ve written in <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/fashion.avsc\">fashion.avsc<\/a>. It has two fields: a bytes field with the image data, and an int field with the code for the type of fashion item the picture is of.<\/p>\n<p>The script is using the CLI to store the schema in the Event Streams registry.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">cloudctl es schema-add --file fashion.avsc --name FashionMnist --version 1.0.0<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/run-step-0.sh#L16-L17\">from <code>run-step-0.sh<\/code><\/a><\/small><\/p>\n<p>For the purposes of this post, I&#8217;m just creating single partition topics, one for the training data, one for the test data.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">cloudctl es topic-create FASHION.MNIST.TRAINING --partitions 1 --replication-factor 3\ncloudctl es topic-create FASHION.MNIST.TEST.ARCHIVE --partitions 1 --replication-factor 3<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/run-step-0.sh#L20-L21\">from <code>run-step-0.sh<\/code><\/a><\/small><\/p>\n<p>With those defined, I&#8217;m ready to fetch the dataset and get it into my Kafka topics.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">print (\"--&gt;&gt;&gt; Create a Kafka producer\")\nproducer = KafkaProducer(bootstrap_servers=[KAFKA_BOOTSTRAP],\n                         security_protocol=\"SASL_SSL\",\n                         ssl_cafile=CERT,\n                         ssl_check_hostname=False,\n                         sasl_plain_username=\"token\",\n                         sasl_plain_password=EVENT_STREAMS_API_KEY,\n                         sasl_mechanism=\"PLAIN\",\n                         client_id=\"dalelane-load-topics\")\n\nprint (\"--&gt;&gt;&gt; Download the training data to be used\")\ntrain, test = tfds.load(\"fashion_mnist\", split=[\"train\", \"test\"])\n\nprint (\"--&gt;&gt;&gt; Sending training data to Kafka topic %s\" % TRAINING_TOPIC_NAME)\nfor features in train.batch(1).take(-1):\n    producer.send(TRAINING_TOPIC_NAME,\n                  serialize(schema, {\n                      \"ImageData\" : features[\"image\"].numpy()[0].tobytes(),\n                      \"Label\" : int(features[\"label\"].numpy()[0])\n                  }),\n                  None,\n                  create_headers_for_schema(SCHEMA_ID, SCHEMA_VERSION))<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/0_load_topics.py#L16-L37\">from <code>0_load_topics.py<\/code><\/a><\/small><\/p>\n<p>With this done, I now have Kafka topics ready to be a data source for my machine learning pipeline.<\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48988072138\/in\/datetaken-public\/\" title=\"191030-kafka-es-1\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/live.staticflickr.com\/65535\/48988072138_9be711c185.jpg\" width=\"450\" height=\"254\" alt=\"191030-kafka-es-1\" style=\"border: thin black solid\"\/><\/a><\/p>\n<p>It&#8217;s worth pointing out that this was trivially simple for this project, because I&#8217;m using a clean well-prepared dataset.<\/p>\n<p>For you, this stage of the pipeline is almost certainly more complicated, which is why I broke it out into two separate steps (getting the data into Kafka, and pre-processing the data) in my first diagram above.<\/p>\n<h3>Step 1<br \/>\nUsing a Kafka topic as a source of training data for an ML project<\/h3>\n<p><strong><em>To see this step running, run the <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/run-step-1.sh\">run-step-1.sh<\/a><\/code> script.<\/em><\/strong><br \/>\n<strong><em>For an example of the output, see <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/sample-output\/step-1.txt\">sample-output\/step-1.txt<\/a><\/code><\/em><\/strong><\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982151903\/in\/datetaken-public\/\" title=\"191030-kafka-ml-3-1\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/live.staticflickr.com\/65535\/48982151903_c519378fdc.jpg\" width=\"450\" height=\"244\" style=\"border: thin black solid\" alt=\"191030-kafka-ml-3-1\"\/><\/a><\/p>\n<p>I&#8217;ve got the training data ready to use on my <code>FASHION.MNIST.TRAINING<\/code> topic, serialized using an Avro schema in my registry.<\/p>\n<p>Now I&#8217;m ready to use it to train a machine learning model.<\/p>\n<p>First, I define a TensorFlow dataset based on the Kafka topic.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">dataset = kafka_io.KafkaDataset([ TRAINING_TOPIC_NAME + ':0' ],\n                                servers=KAFKA_BOOTSTRAP,\n                                group=\"dalelane-tensorflow-train\",\n                                config_global=[\n                                    \"api.version.request=true\",\n                                    \"sasl.mechanisms=PLAIN\",\n                                    \"security.protocol=sasl_ssl\",\n                                    \"sasl.username=token\",\n                                    \"sasl.password=\" + EVENT_STREAMS_API_KEY,\n                                    \"ssl.ca.location=\" + CERT\n                                ])<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/train_model.py#L37\">from <code>train_model.py<\/code><\/a><\/small><\/p>\n<p>Then, I deserialize it using the Avro schema.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">def deserialize(kafkamessage):\n    trainingitem = kafka_io.decode_avro(kafkamessage,\n                                        schema=schemastr,\n                                        dtype=[tf.string, tf.int32])\n\n    image = tf.io.decode_raw(trainingitem[0], out_type=tf.uint8)\n    image = tf.reshape(image, [28, 28])\n    image = tf.image.convert_image_dtype(image, tf.float32)\n\n    label = tf.reshape(trainingitem[1], [])\n\n    return image, label\n\n\ndataset = dataset.map(deserialize).batch(1)<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/train_model.py#L21-L32\">from <code>train_model.py<\/code><\/a><\/small><\/p>\n<p>With the dataset ready, I can define the neural net.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">model = keras.Sequential([\n    keras.layers.Flatten(input_shape=(28, 28)),\n    keras.layers.Dense(128, activation=\"relu\"),\n    keras.layers.Dense(10, activation=\"softmax\")\n])\n\nmodel.compile(optimizer=\"adam\",\n            loss=\"sparse_categorical_crossentropy\",\n            metrics=[\"accuracy\"])\n\nmodel.fit(dataset, epochs=4, steps_per_epoch=1000)<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/train_model.py#L51-L64\">from <code>train_model.py<\/code><\/a><\/small><\/p>\n<p>I&#8217;ve copied this neural net definition from the <a href=\"https:\/\/www.tensorflow.org\/tutorials\/keras\/classification\">TensorFlow Keras classification tutorial<\/a> (as it&#8217;s also based on the same Fashion MNIST dataset).<\/p>\n<p>Now I have a working machine learning model.<\/p>\n<h3>Step 2<br \/>\nTry out the machine learning model<\/h3>\n<p><strong><em>To see this step running, run the <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/run-step-2.sh\">run-step-2.sh<\/a><\/code> script.<\/em><\/strong><br \/>\n<strong><em>For an example of the output, see <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/sample-output\/step-2.txt\">sample-output\/step-2.txt<\/a><\/code><\/em><\/strong><\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982151938\/in\/datetaken-public\/\" title=\"191030-kafka-ml-3-2\"><img loading=\"lazy\" decoding=\"async\" style=\"border: thin black solid\" src=\"https:\/\/live.staticflickr.com\/65535\/48982151938_6e1fbab90e.jpg\" width=\"450\" height=\"243\" alt=\"191030-kafka-ml-3-2\"\/><\/a><\/p>\n<p>Before continuing, let&#8217;s give the ML model a quick try.<\/p>\n<p>I&#8217;m picking one of the test images from the Fashion MNIST dataset at random, and using the machine learning model to make a prediction for what is in the image.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">print (\"--&gt;&gt;&gt; Get information about the Fashion MNIST dataset\")\ndatasetinfo = tfds.builder(\"fashion_mnist\").info.features[\"label\"]\n\nprint (\"--&gt;&gt;&gt; Choose a random item from the test set\")\ntest, = tfds.load(\"fashion_mnist\", split=[\"test\"])\nfor features in test.shuffle(10000).batch(1).take(1):\n    testdata = features[\"image\"].numpy()[0]\n\n    print (\"--&gt;&gt;&gt; Use the ML model to make a prediction for the test item\")\n    prediction = model.predict(testdata.reshape(1, 28, 28))\n    prediction_idx = prediction.argmax()\n    prediction_name = datasetinfo.int2str(prediction_idx)\n    prediction_confidence = prediction[0][prediction_idx] * 100.0\n\n    print (\"--&gt;&gt;&gt; Prediction : %s with %d%% confidence\" %\n        (prediction_name, prediction_confidence))\n\n    print (\"--&gt;&gt;&gt; Saving random test item to test.png for verification\")\n    plt.xticks([])\n    plt.yticks([])\n    plt.grid(False)\n    testimage = testdata.reshape([-1, 28, 28, 1]) \/ 255\n    plt.imshow(1 - testimage[0][:, :, 0], cmap='gray')\n    plt.savefig(\"test.png\")<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/2_train_and_try_model.py#L12-L35\">from <code>2_train_and_try_model.py<\/code><\/a><\/small><\/p>\n<p>To check if the ML model is correct, I&#8217;m saving the test image to a file so you can open it and see what you think it is.<\/p>\n<h3>Step 3<br \/>\nUsing a Kafka topic as a source of test data for an ML project<\/h3>\n<p><strong><em>To see this step running, run the <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/run-step-3.sh\">run-step-3.sh<\/a><\/code> script.<\/em><\/strong><br \/>\n<strong><em>For an example of the output, see <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/sample-output\/step-3.txt\">sample-output\/step-3.txt<\/a><\/code><\/em><\/strong><\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982902077\/in\/datetaken-public\/\" title=\"191030-kafka-ml-3-3\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/live.staticflickr.com\/65535\/48982902077_84ba96ce08.jpg\" width=\"450\" style=\"border: thin black solid\" height=\"245\" alt=\"191030-kafka-ml-3-3\"\/><\/a><\/p>\n<p>Individual predictions aren&#8217;t good enough for testing a model, so the next step is to run the complete test set through the ML model and measure the accuracy of the predictions.<\/p>\n<p>In <strong>Step 0<\/strong>, I filled the <code>FASHION.MNIST.TEST.ARCHIVE<\/code> topic with the <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/0_load_topics.py#L40-L49\">test data from the Fashion MNIST dataset<\/a>.<\/p>\n<p>In <strong>Step 1<\/strong>, I <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/train_model.py#L64\">trained a machine learning model<\/a>.<\/p>\n<p>Now I&#8217;m ready to evaluate the ML model using the data from the test topic.<\/p>\n<p>Notice that the dataset is defined using <code>eof=True<\/code> which means it will read until the end of the topic and then close the consumer. This is useful in situations like this, where the test data is already on the topic, and you want to read all of it into a dataset to use.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">print (\"--&gt;&gt;&gt; Prepare a test dataset using test data from Kafka\")\ndataset = kafka_io.KafkaDataset([ TEST_ARCHIVE_TOPIC_NAME + \":0\"],\n                                servers=KAFKA_BOOTSTRAP,\n                                group=\"dalelane-tensorflow-test\",\n                                eof=True,\n                                config_global=[\n                                    \"api.version.request=true\",\n                                    \"sasl.mechanisms=PLAIN\",\n                                    \"security.protocol=sasl_ssl\",\n                                    \"sasl.username=token\",\n                                    \"sasl.password=\" + EVENT_STREAMS_API_KEY,\n                                    \"ssl.ca.location=\" + CERT\n                                ])\ndataset = dataset.map(train_model.deserialize).batch(1)\n\nprint (\"--&gt;&gt;&gt; Evaluate the machine learning model using test data from Kafka\")\nmodel.evaluate(dataset)<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/3_train_and_test_model.py#L14-L30\">from <code>3_train_and_test_model.py<\/code><\/a><\/small><\/p>\n<p>One benefit of using Kafka topics as data sources for projects like this is that you can tweak the definition of your neural net, and replay the training and test data.<\/p>\n<p>And you can go round that loop of tweak, re-train, and re-test repeatedly until you&#8217;re happy with the test accuracy.<\/p>\n<h3>Step 4<br \/>\nUsing an ML model to make predictions about a stream of events on a Kafka topic<\/h3>\n<p><strong><em>To see this step running, run the <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/run-step-4.sh\">run-step-4.sh<\/a><\/code> script.<\/em><\/strong><br \/>\n<strong><em>For an example of the output, see <code><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/master\/sample-output\/step-4.txt\">sample-output\/step-4.txt<\/a><\/code><\/em><\/strong><\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982152058\/in\/datetaken-public\/\" title=\"191030-kafka-ml-3-4\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/live.staticflickr.com\/65535\/48982152058_40d5c00926.jpg\" width=\"450\" height=\"244\" alt=\"191030-kafka-ml-3-4\"\/><\/a><\/p>\n<p>At this point, I have a working neural network, trained using the data from my training topic and verified using the data from my test topic.<\/p>\n<p>Now I&#8217;m ready to start using my machine learning model with a live stream of events on a Kafka topic.<\/p>\n<p>I don&#8217;t actually have a Kafka topic with a stream of photo events with fashion items, so for the purposes of this demo I&#8217;ll simulate it, again using the test data from the fashion MNIST dataset.<\/p>\n<p>First, I create a new empty topic.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">cloudctl es topic-create FASHION.MNIST.TEST.STREAM --partitions 1 --replication-factor 1<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/run-step-4.sh#L20\">from <code>run-step-4.py<\/code><\/a><\/small><\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48988827402\/in\/datetaken-public\/\" title=\"191030-kafka-es-0\"><img loading=\"lazy\" decoding=\"async\" style=\"border: thin black solid\" src=\"https:\/\/live.staticflickr.com\/65535\/48988827402_86022d7350.jpg\" width=\"450\" height=\"197\" alt=\"191030-kafka-es-0\"\/><\/a><\/p>\n<p>Creating my live stream of events to be classified is similar to how I populated the <code>FASHION.MNIST.TEST.ARCHIVE<\/code> topic before, but I&#8217;m adding a <code>sleep<\/code> call in there so I only produce one image a second.<\/p>\n<p>As there are 10,000 test images in the dataset, this means I&#8217;ll have a stream of events for over two-and-a-half hours, which is easily enough for a quick demo!<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">print (\"--&gt;&gt;&gt; Create a Kafka producer\")\nproducer = KafkaProducer(bootstrap_servers=[KAFKA_BOOTSTRAP],\n                         security_protocol=\"SASL_SSL\",\n                         ssl_cafile=CERT,\n                         ssl_check_hostname=False,\n                         sasl_plain_username=\"token\",\n                         sasl_plain_password=EVENT_STREAMS_API_KEY,\n                         sasl_mechanism=\"PLAIN\",\n                         client_id=\"dalelane-load-topics\")\n\nprint (\"--&gt;&gt;&gt; Download the test data to be used\")\ntest, = tfds.load(\"fashion_mnist\", split=[\"test\"])\n\nprint (\"--&gt;&gt;&gt; Starting to create test events to %s every second\" % TEST_STREAM_TOPIC_NAME)\nfor features in test.batch(1).take(-1):\n    producer.send(TEST_STREAM_TOPIC_NAME,\n                  serialize(schema, {\n                      \"ImageData\" : features[\"image\"].numpy()[0].tobytes(),\n                      \"Label\" : int(features[\"label\"].numpy()[0])\n                  }),\n                  None,\n                  create_headers_for_schema(SCHEMA_ID, SCHEMA_VERSION))\n    sleep(1)\nproducer.flush()<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/simulate_test_stream.py#L17-L40\">from <code>simulate_test_stream.py<\/code><\/a><\/small><\/p>\n<p>While this is running, the next step is to create a streaming dataset based on that topic.<\/p>\n<p>Notice the <code>eof=False<\/code>, so (unlike before) the consumer won&#8217;t be closed when it reaches the end of the topic. This dataset will stay open until you kill it with Ctrl-C, as the objective is to keep a long-running streaming dataset.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">dataset = kafka_io.KafkaDataset([ TEST_STREAM_TOPIC_NAME + \":0\"],\n                                servers=KAFKA_BOOTSTRAP,\n                                group=\"dalelane-tensorflow-test\",\n                                eof=False,\n                                config_global=[\n                                    \"api.version.request=true\",\n                                    \"sasl.mechanisms=PLAIN\",\n                                    \"security.protocol=sasl_ssl\",\n                                    \"sasl.username=token\",\n                                    \"sasl.password=\" + EVENT_STREAMS_API_KEY,\n                                    \"ssl.ca.location=\" + CERT\n                                ])\ndataset = dataset.map(train_model.deserialize).batch(1)<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/4_train_and_stream_model.py#L15-L27\">from <code>4_train_and_stream_model.py<\/code><\/a><\/small><\/p>\n<p>And finally, to classify the image received in each message on the Kafka topic, using the machine learning model.<\/p>\n<pre style=\"border: thin solid silver; background-color: #eeeeee; padding: 0.8em; color: black; overflow-x: scroll;\">for image, label in dataset:\n    prediction = model.predict(image)<\/pre>\n<p align=\"right\"><small><a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/blob\/d10bf4d88e3ec0495bf20c619481ebcc9d1a549b\/4_train_and_stream_model.py#L30-L31\">from <code>4_train_and_stream_model.py<\/code><\/a><\/small><\/p>\n<p>And that&#8217;s the demo.<\/p>\n<p>Hopefully this is enough to make it easy for you to setup your own neural net based on the data on your own Kafka topics.<\/p>\n<p><a data-flickr-embed=\"true\" href=\"https:\/\/www.flickr.com\/photos\/dalelane\/48982711946\/in\/datetaken-public\/\" title=\"191030-kafka-ml-3\"><img loading=\"lazy\" decoding=\"async\" style=\"border: thin black solid\" src=\"https:\/\/live.staticflickr.com\/65535\/48982711946_5229953ea1.jpg\" width=\"500\" height=\"274\" alt=\"191030-kafka-ml-3\"\/><\/a><\/p>\n<p>If there is anything else you think it would be useful to add to the sample, please <a href=\"https:\/\/github.com\/dalelane\/event-streams-tensorflow\/issues\">let me know<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this post, I want to explain how to get started creating machine learning applications using the data you have on Kafka topics. I&#8217;ve written a sample app, with examples of how you can use Kafka topics as: a source of training data for creating machine learning models a source of test data for evaluating [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":3946,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,582,583,584,580],"class_list":["post-3924","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-eventstreams","tag-ibmeventstreams","tag-kafka","tag-machine-learning"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/3924","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=3924"}],"version-history":[{"count":0,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/3924\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/3946"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=3924"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=3924"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=3924"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}