Social media updates with Kafka Connect

In this post, I’ll show how to bring posts from open social media networks (Bluesky and Mastodon) into Kafka using Kafka Connect source connectors.

My goal is to be able to populate a Kafka topic with status updates posted to social media.

Rather than to try and do this with the full firehose of all status updates, this is done with status updates that match a search term or hashtag.

For example, the screenshot above is a Kafka topic with posts from Bluesky that mention the term “xbox”.

Getting the connectors

You can download the jars from my github:

Bluesky : github.com/dalelane/kafka-connect-bluesky-source

Mastodon : github.com/dalelane/kafka-connect-mastodon-source

If you’re running Kafka Connect manually, you need to put these jars in your plugin.path location.

If you’re using Strimzi or IBM Event Streams to build your Kafka Connect workers automatically, you’d point at the jars like:

kind: KafkaConnect
...
spec:
  build:
    plugins:
      # Social media posts from Bluesky
      - name: bluesky
        artifacts:
          - type: jar
            url: 'https://github.com/dalelane/kafka-connect-bluesky-source/releases/download/0.0.1/kafka-connect-bluesky-source-0.0.1-jar-with-dependencies.jar'
      # Social media posts from Mastodon
      - name: mastodon
        artifacts:
          - type: jar
            url: 'https://github.com/dalelane/kafka-connect-mastodon-source/releases/download/0.0.3/kafka-connect-mastodon-source-0.0.3-jar-with-dependencies.jar'
...

You can see an example of this in the context of a whole Kafka Connect spec in my demo repository.

Configuring the connectors

Each of the connector repositories has a README that describes the config options, but essentially you just need to give them:

  • credentials for accessing the social media service
  • identify which topic you want the status updates to be delivered to
  • specify a search term to identify which posts to stream into Kafka

Bluesky

If you’re running a stand-alone Connect instance, the properties file will look something like this:

# name this anything you want
name=bluesky-connector
# identify the Bluesky connector
connector.class=uk.co.dalelane.kafkaconnect.bluesky.source.BlueskySourceConnector
# your Bluesky username
bluesky.identity=yourusername.bsky.social
# an app password - create one at https://bsky.app/settings/app-passwords
bluesky.password=aaaa-bbbb-cccc-dddd
# the search term to use
bluesky.searchterm=thanksgiving
# the topic to deliver the Bluesky status updates to
bluesky.topic=BLUESKY.THANKSGIVING

If you’re using a distributed Connect cluster, the equivalent REST API request will be:

{
  "name": "bluesky-connector",
  "config": {
    "connector.class": "uk.co.dalelane.kafkaconnect.bluesky.source.BlueskySourceConnector",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",

    "bluesky.identity": "yourusername.bsky.social",
    "bluesky.password": "aaaa-bbbb-cccc-dddd",

    "bluesky.searchterm": "thanksgiving",
    "bluesky.topic": "BLUESKY.THANKSGIVING",

    "tasks.max": 1
  }
}

If you’re using Strimzi or IBM Event Streams, you can see examples of how to do this in my bluesky-connectors.yaml spec.

Mastodon

If you’re running a stand-alone Connect instance, the properties file will look something like this:

# name this anything you want
name=mastodon-connector
# identify the Mastodon connector
connector.class=uk.co.dalelane.kafkaconnect.mastodon.source.MastodonSourceConnector
# a Mastodon token with at least read:search and read:statuses scopes
mastodon.accesstoken=REPLACE-THIS-WITH-YOUR-ACCESS-TOKEN
# which federated instance of Mastodon to use
mastodon.instance=mastodon.org.uk
# hashtag to stream to Kafka
mastodon.searchterm=thanksgiving
# the topic to deliver the Mastodon status updates to
mastodon.topic=MASTODON.THANKSGIVING

If you’re using a distributed Connect cluster, the equivalent REST API request will be:

{
  "name": "mastodon-connector",
  "config": {
    "connector.class": "uk.co.dalelane.kafkaconnect.mastodon.source.MastodonSourceConnector",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",

    "mastodon.accesstoken": "REPLACE-THIS-WITH-YOUR-ACCESS-TOKEN",
    "mastodon.instance": "mastodon.org.uk",

    "mastodon.searchterm": "thanksgiving",
    "mastodon.topic": "MASTODON.THANKSGIVING",

    "tasks.max": 1
  }
}

If you’re using Strimzi or IBM Event Streams, you can see examples of how to do this in my mastodon-connectors.yaml spec.

Output format

The Connectors emit structured records, which means you can use them with your choice of converter – for example, Avro or JSON. (Or even XML if you’re feeling like making your life more challenging.)

As you can see in the example specs above, I’ve been using them to produce JSON events.

Example output

To start with, I’m only outputting a subset of the API responses from Bluesky and Mastodon. I’ve included the properties that I thought would be useful. (I’m very open to suggestions or requests to add additional properties if there is something else you need.)

The result is that you get Kafka topics with a constant stream of events with posts from social media.

In the IBM Event Streams UI, this can look something like this:

Processing the events

To finish off, let’s have a quick play using Apache Flink.

For example, I can count how many posts each social media site received per hour for a couple of search terms (“xbox” and “netflix” – which I chose because between The Game Awards and the coverage of the recent boxing match, they are both being talked about this week).

As you can see, I used IBM Event Processing to quickly prototype this Flink job. If you want to see the Flink SQL that was generated under the covers, I put an exported copy (minus a few of the specific properties for my Kafka cluster) in a public gist.

(To be clear, this is in no way a fair or robust analysis of Bluesky activity vs Mastodon. For example, on Bluesky the connector is doing a text search for any updates that mention the word “xbox” or “netflix”. Mastodon doesn’t have a free text search, and the API only lets you fetch posts with an explicit hashtag. Lots of people won’t think to use #xbox or #netflix in their posts, so even this difference alone makes a comparison like this unbalanced.)

For another example of processing social media updates from a Kafka topic, you can see my earlier post on Analysing social media sentiment.

Tags: , , ,

Leave a Reply