{"id":5354,"date":"2024-11-19T21:48:05","date_gmt":"2024-11-19T21:48:05","guid":{"rendered":"https:\/\/dalelane.co.uk\/blog\/?p=5354"},"modified":"2026-04-02T17:29:45","modified_gmt":"2026-04-02T17:29:45","slug":"social-media-updates-with-kafka-connect","status":"publish","type":"post","link":"https:\/\/dalelane.co.uk\/blog\/?p=5354","title":{"rendered":"Social media updates with Kafka Connect"},"content":{"rendered":"<p><strong>In this post, I\u2019ll show how to bring posts from open social media networks (<a href=\"https:\/\/bsky.app\/\">Bluesky<\/a> and <a href=\"https:\/\/joinmastodon.org\/\">Mastodon<\/a>) into <a href=\"https:\/\/kafka.apache.org\/\">Kafka<\/a> using Kafka Connect source connectors.<\/strong><\/p>\n<p>My goal is to be able to populate a Kafka topic with status updates posted to social media. <\/p>\n<p>Rather than to try and do this with the full firehose of all status updates, this is done with status updates that match a search term or hashtag.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-11-19-social-kafka\/preview.png\" style=\"width: 100%; max-width: 600px; border: thin black solid;\"\/><\/p>\n<p>For example, the screenshot above is a Kafka topic with posts from Bluesky that mention the term &#8220;xbox&#8221;.<\/p>\n<p><!--more--><\/p>\n<h3>Getting the connectors<\/h3>\n<p>You can download the jars from <a href=\"https:\/\/github.com\/dalelane\">my github<\/a>:<\/p>\n<p><strong>Bluesky<\/strong> : <a href=\"https:\/\/github.com\/dalelane\/kafka-connect-bluesky-source\/releases\">github.com\/dalelane\/kafka-connect-bluesky-source<\/a><\/p>\n<p><strong>Mastodon<\/strong> : <a href=\"https:\/\/github.com\/dalelane\/kafka-connect-mastodon-source\/releases\">github.com\/dalelane\/kafka-connect-mastodon-source<\/a><\/p>\n<p>If you&#8217;re running Kafka Connect manually, you need to put these jars in <a href=\"https:\/\/kafka.apache.org\/documentation\/#connectconfigs_plugin.path\">your <code>plugin.path<\/code> location<\/a>.<\/p>\n<p>If you\u2019re using <a href=\"https:\/\/strimzi.io\/\">Strimzi<\/a> or <a href=\"https:\/\/www.ibm.com\/products\/event-automation\/event-streams\">IBM Event Streams<\/a> to build your Kafka Connect workers automatically, you\u2019d point at the jars like:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">kind: KafkaConnect\n...\nspec:\n  build:\n    plugins:\n      # Social media posts from Bluesky\n      - name: bluesky\n        artifacts:\n          - type: jar\n            url: 'https:\/\/github.com\/dalelane\/kafka-connect-bluesky-source\/releases\/download\/0.0.1\/kafka-connect-bluesky-source-0.0.1-jar-with-dependencies.jar'\n      # Social media posts from Mastodon\n      - name: mastodon\n        artifacts:\n          - type: jar\n            url: 'https:\/\/github.com\/dalelane\/kafka-connect-mastodon-source\/releases\/download\/0.0.3\/kafka-connect-mastodon-source-0.0.3-jar-with-dependencies.jar'\n...\n<\/pre>\n<p>You can see an example of this in the context of <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/kafka-connect.yaml\">a whole Kafka Connect spec in my demo repository<\/a>.<\/p>\n<h3>Configuring the connectors<\/h3>\n<p>Each of the connector repositories has a README that describes the config options, but essentially you just need to give them:<\/p>\n<ul>\n<li>credentials for accessing the social media service<\/li>\n<li>identify which topic you want the status updates to be delivered to<\/li>\n<li>specify a search term to identify which posts to stream into Kafka<\/li>\n<\/ul>\n<h4>Bluesky<\/h4>\n<p>If you\u2019re running a stand-alone Connect instance, the properties file will look something like this:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\"># name this anything you want\nname=bluesky-connector\n# identify the Bluesky connector\nconnector.class=uk.co.dalelane.kafkaconnect.bluesky.source.BlueskySourceConnector\n# your Bluesky username\nbluesky.identity=yourusername.bsky.social\n# an app password - create one at https:\/\/bsky.app\/settings\/app-passwords\nbluesky.password=aaaa-bbbb-cccc-dddd\n# the search term to use\nbluesky.searchterm=thanksgiving\n# the topic to deliver the Bluesky status updates to\nbluesky.topic=BLUESKY.THANKSGIVING<\/pre>\n<p>If you\u2019re using a distributed Connect cluster, the equivalent REST API request will be:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">{\n  \"name\": \"bluesky-connector\",\n  \"config\": {\n    \"connector.class\": \"uk.co.dalelane.kafkaconnect.bluesky.source.BlueskySourceConnector\",\n\n    \"key.converter\": \"org.apache.kafka.connect.storage.StringConverter\",\n    \"value.converter\": \"org.apache.kafka.connect.json.JsonConverter\",\n\n    \"bluesky.identity\": \"yourusername.bsky.social\",\n    \"bluesky.password\": \"aaaa-bbbb-cccc-dddd\",\n\n    \"bluesky.searchterm\": \"thanksgiving\",\n    \"bluesky.topic\": \"BLUESKY.THANKSGIVING\",\n\n    \"tasks.max\": 1\n  }\n}<\/pre>\n<p>If you\u2019re using Strimzi or IBM Event Streams, you can see examples of how to do this in my <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/bluesky-connectors.yaml\">bluesky-connectors.yaml<\/a> spec.<\/p>\n<h4>Mastodon<\/h4>\n<p>If you\u2019re running a stand-alone Connect instance, the properties file will look something like this:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\"># name this anything you want\nname=mastodon-connector\n# identify the Mastodon connector\nconnector.class=uk.co.dalelane.kafkaconnect.mastodon.source.MastodonSourceConnector\n# a Mastodon token with at least read:search and read:statuses scopes\nmastodon.accesstoken=REPLACE-THIS-WITH-YOUR-ACCESS-TOKEN\n# which federated instance of Mastodon to use\nmastodon.instance=mastodon.org.uk\n# hashtag to stream to Kafka\nmastodon.searchterm=thanksgiving\n# the topic to deliver the Mastodon status updates to\nmastodon.topic=MASTODON.THANKSGIVING<\/pre>\n<p>If you\u2019re using a distributed Connect cluster, the equivalent REST API request will be:<\/p>\n<pre style=\"border: thin #AA0000 solid; color: #770000; background-color: #ffffc0; padding: 1em; overflow-y: scroll; overflow-x: scroll; font-size: 0.9em; white-space: pre; max-height: 520px;\">{\n  \"name\": \"mastodon-connector\",\n  \"config\": {\n    \"connector.class\": \"uk.co.dalelane.kafkaconnect.mastodon.source.MastodonSourceConnector\",\n\n    \"key.converter\": \"org.apache.kafka.connect.storage.StringConverter\",\n    \"value.converter\": \"org.apache.kafka.connect.json.JsonConverter\",\n\n    \"mastodon.accesstoken\": \"REPLACE-THIS-WITH-YOUR-ACCESS-TOKEN\",\n    \"mastodon.instance\": \"mastodon.org.uk\",\n\n    \"mastodon.searchterm\": \"thanksgiving\",\n    \"mastodon.topic\": \"MASTODON.THANKSGIVING\",\n\n    \"tasks.max\": 1\n  }\n}<\/pre>\n<p>If you\u2019re using Strimzi or IBM Event Streams, you can see examples of how to do this in my <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/mastodon-connectors.yaml\">mastodon-connectors.yaml<\/a> spec.<\/p>\n<h3>Output format<\/h3>\n<p>The Connectors emit structured records, which means you can use them with your choice of converter &#8211; for example, Avro or JSON. (<em>Or even <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5238\">XML<\/a> if you&#8217;re feeling like making your life more challenging.<\/em>)<\/p>\n<p>As you can see in the example specs above, I\u2019ve been <a href=\"https:\/\/github.com\/dalelane\/kafka-demos\/blob\/master\/mastodon-connectors.yaml#L18-L19\">using them to produce JSON events<\/a>.<\/p>\n<h3>Example output<\/h3>\n<p>To start with, I\u2019m only outputting a subset of the API responses from Bluesky and Mastodon. I&#8217;ve included the properties that I thought would be useful. (I\u2019m very open to <a href=\"https:\/\/github.com\/dalelane\/kafka-connect-mastodon-source\/issues\">suggestions<\/a> or <a href=\"https:\/\/github.com\/dalelane\/kafka-connect-bluesky-source\/issues\">requests<\/a> to add additional properties if there is something else you need.)<\/p>\n<p>The result is that you get Kafka topics with a constant stream of events with posts from social media.<\/p>\n<p>In the IBM Event Streams UI, this can look something like this:<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-11-19-social-kafka\/screenshot-bluesky.png\" style=\"width: 100%; max-width: 600px; border: thin black solid;\"\/><\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-11-19-social-kafka\/screenshot-mastodon.png\" style=\"width: 100%; max-width: 600px; border: thin black solid;\"\/><\/p>\n<h3>Processing the events<\/h3>\n<p>To finish off, let\u2019s have a quick play using <a href=\"https:\/\/flink.apache.org\/\">Apache Flink<\/a>. <\/p>\n<p>For example, I can count how many posts each social media site received per hour for a couple of search terms (&#8220;xbox&#8221; and &#8220;netflix&#8221; &#8211; which I chose because between <a href=\"https:\/\/thegameawards.com\/\">The Game Awards<\/a> and the coverage of the recent boxing match, they are both being talked about this week).<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/images.dalelane.co.uk\/2024-11-19-social-kafka\/screenshot-flink.png\" style=\"width: 100%; max-width: 600px; border: thin black solid;\"\/><\/p>\n<p>As you can see, I used <a href=\"https:\/\/www.ibm.com\/products\/event-automation\/event-processing\">IBM Event Processing<\/a> to quickly prototype this Flink job. If you want to see the Flink SQL that was generated under the covers, I put an exported copy (minus a few of the specific properties for my Kafka cluster) in <a href=\"https:\/\/gist.github.com\/dalelane\/9e9feccd0306bf1c8aa04b912eafcd24\">a public gist.<\/a><\/p>\n<p><script src=\"https:\/\/gist.github.com\/dalelane\/9e9feccd0306bf1c8aa04b912eafcd24.js\"><\/script><\/p>\n<p>(To be clear, this is in no way a fair or robust analysis of Bluesky activity vs Mastodon. For example, on Bluesky the connector is doing a text search for any updates that mention the word \u201cxbox\u201d or \u201cnetflix\u201d. Mastodon doesn\u2019t have a free text search, and the API only lets you fetch posts with an explicit hashtag. Lots of people won\u2019t think to use #xbox or #netflix in their posts, so even this difference alone makes a comparison like this unbalanced.)<\/p>\n<p>For another example of processing social media updates from a Kafka topic, you can see my earlier post on <a href=\"https:\/\/dalelane.co.uk\/blog\/?p=5298\">Analysing social media sentiment<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>How to bring posts from social media networks (Bluesky and Mastodon) into Kafka using Kafka Connect source connectors<\/p>\n","protected":false},"author":1,"featured_media":5355,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[7],"tags":[593,610,583,584],"class_list":["post-5354","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-code","tag-apachekafka","tag-flink","tag-ibmeventstreams","tag-kafka"],"_links":{"self":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5354","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5354"}],"version-history":[{"count":1,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5354\/revisions"}],"predecessor-version":[{"id":5955,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/posts\/5354\/revisions\/5955"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=\/wp\/v2\/media\/5355"}],"wp:attachment":[{"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5354"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5354"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dalelane.co.uk\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5354"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}