In this post, I share a step-by-step guide for how to use IBM DataStage to merge JSON messages from multiple different Apache Kafka topics, into a single joined-up stream of events.
I needed to use IBM DataStage for the first time last week. As this was new to me, it feels interesting enough to share. (If nothing else, putting my notes here will probably be useful to me when I next need to remind myself how I got started!)
Warning: This is the first DataStage flow I’ve created, so I’m not an authority and can’t claim this is best practice. It seems to work as far as I can tell, but it’s entirely possible I’m doing something daft (so please let me know if you see any mistakes!)
For my first flow, I wanted to try merging multiple streams of events into a single joined up stream. To keep the walkthrough in this post easier to follow, I’ve kept it to just two topics, but I used this approach to combine several topics in the same way.
My starting point was to create a Kafka cluster using IBM Event Streams, from Cloud Pak for Integration.
I have a collection of topics for stock price updates. Each topic receives events every minute for a different company.
(If you’d like to know how I get a Kafka topic with stock price events, I’ve written about that before, but the short version is I’m using a custom Kafka Connect connector.)
Now I was ready to start processing my Kafka messages, using IBM DataStage from Cloud Pak for Data.
First up, I clicked on New project to create my project.
I went with an empty project for this.
To start with, there are a few assets I wanted to get ready. So I clicked on to the Assets tab then clicked the New asset button.
The first thing to get ready was the connection to the Kafka cluster, so I chose the Connection asset type.
Lots of connection types to choose from – I wanted Apache Kafka.
I called my connection “Event Streams” because I lack imagination.
Next thing I needed was the bootstrap address for the Kafka cluster, which I got by clicking on the copy button for the External listener.
I pasted that into the Kafka server host name field.
I needed some credentials for DataStage to use, so I clicked the Generate SCRAM credentials button, and used the wizard to create a new username and password.
I pasted those into the credentials section of the DataStage connection form.
Finally, I needed to download the truststore certificate for the Kafka listener, which I got by clicking the download button for the PEM certificate.
I pasted the contents of that pem file into the Truststore certificates box on the DataStage connection form.
First asset ready, two more to go.
Next was storing a schema that describes the JSON messages that we’ll be consuming from the Kafka topics.
Back to the New asset button to start that.
Schema libraries are included in the DataStage component section.
I chose the Schema library.
This library was for the definitions of input messages – the messages on the topics like STOCK.PRICES.APPLE
and STOCK.PRICES.MICROSOFT
that I wanted to consume.
I clicked on Add resource to upload my JSON file.
You don’t have to upload a schema – you can just upload an example message, and DataStage will generate a schema from that.
So I went back to the Event Streams web interface, and clicked on the Copy to copy the contents of the most recent message.
My sample.json
file looked like:
{ "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044, "timestamp": 1668805140, "datetime": "2022-11-18 15:59:00" }
DataStage used my sample.json
file to generate a sample.json.xsd schema.
I also wanted to create a schema library for the messages I wanted my DataStage flows to output, so back to the Assets tab to click on New asset again.
As before, I clicked into DataStage component and then Schema library.
I named this schema library to show I’d use it to store the schema of output messages.
Then clicked on Add resource to upload my JSON file.
As before, I didn’t upload a schema, I uploaded a file with a sample message payload and let DataStage generate a schema for me.
I created a file called combined.json
and came up with this idea for a simple combined payload.
{ "timestamp": 1668805140, "datetime": "2022-11-18 15:59:00", "apple": { "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044 }, "microsoft": { "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044 } }
(The numbers aren’t accurate – but they didn’t need to be. I just wanted a sample of the shape of the data that could be used to generate a schema).
DataStage used my combined.json
file to generate a combined.json.xsd schema.
Preparation done – now I was ready to create my flow.
Time to click the New asset button for the last time.
This time I chose DataStage.
I gave my flow the name share-prices-combiner. Still feeling unimaginative.
Here was my shiny new canvas, ready to start setting up a flow.
Here is my finished flow.
I’ll go through the critical settings I used for each node below. But first, at a high level, the flow:
- consumed string messages from two of my stock price Kafka topics
- converted the JSON string messages to structured columnar data
- joined the two sets of data using the datetime value in each message
- flattened the joined columnar data back into a JSON string
- produced the JSON string messages to a new Kafka topic
Starting from the left, the Apache Kafka node for consuming messages from the STOCK.PRICES.APPLE
topic.
I picked the “Event Streams” Connection that I’d set up before, added the topic name, entered a consumer group id that Event Streams would accept, and left it using String for deserializing as my JSON payload is a string.
As my output from this node is just a single string message payload, I defined a single output column which I called “INPUT_STRING” and defined as a VARCHAR
with a max length of 300. (The default was 100, but most of my message payloads are a bit longer than that.)
I did more or less the same for the Apache Kafka node for consuming messages from the STOCK.PRICES.MICROSOFT
topic.
I picked the “Event Streams” connection, entered a topic name and consumer group id, and left the rest as defaults.
And again, I defined a single output column to contain the message payload, calling it “INPUT_STRING” so it’s nice and readable.
I named the links out of these Kafka consumer nodes “aapl_string” and “msft_string” – so it would be clear:
- which stock price is coming down that link (aapl for Apple, and msft for Microsoft)
- the structure of the data – a single string column
Next, I needed to parse the JSON message strings.
The top one parses the messages from the STOCK.PRICES.APPLE
topic. I used a Hierarchical Data node for this.
The input for this node is the single “INPUT_STRING” 300-character VARCHAR column coming from the AAPL Kafka consumer node.
The Hierarchical Data node represents a subflow, which I set up with a single JSON parser node.
It’s a little difficult to see the selections here because they get greyed out after you confirm them, but this is how I configured the JSON parser.
The JSON source input for the JSON parser was the “INPUT_STRING” column coming from the “aapl_string” link. So this means I gave the JSON parser the string message payload coming from the STOCK.PRICES.APPLE topic. (You can see this in the right column, too).
And the Document root identifies the schema to use, so I chose the schema from the kafka messages schema library that I set up earlier.
There are some other settings for how to handle JSON validation errors, but I didn’t worry about those as I know that all the messages on my topic are valid JSON matching the schema I’m using.
For the output columns from the JSON parser flow, I created a separate column for each value in my JSON message.
Remember, my JSON message string looks like this:
{ "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044, "timestamp": 1668805140, "datetime": "2022-11-18 15:59:00" }
I created an output column for each of the values in the JSON data. I added a prefix to the name just to make it obvious to me later on where each value came from – so as this is parsing the stock price events for the Apple AAPL stock, I added a “aapl_” prefix to each of the column names.
Because my JSON structure is flat with no nested objects, the mapping from the JSON parser to the output columns was nice and simple.
The mapping here is from the fields defined in the JSON schema on the left, to my output columns on the right.
Back to the overall Hierarchical data node, that means the output columns looked like this.
I named the link out of this node “aapl_cols” to make it clear that this link would have the data from STOCK.PRICES.APPLE but structured as columnar data.
Then I did pretty much the same for the Microsoft stock price events.
A Hierarchical data node for parsing the VARCHAR in the single “INPUT_STRING” column coming from the STOCK.PRICES.MICROSOFT
topic.
The assembly for the Hierarchical data node is just a single JSON parser node.
The config for the JSON parser identifies what input it should parse (the contents of the “INPUT_STRING” column coming from the “msft_string” link), and which schema it should use to do that (the schema from the “kafka messages” schema library).
The output columns I set up are again based on the values in the message payload, prefixed to make it easier to keep track of where the values came from.
So messages that look like this:
{ "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044, "timestamp": 1668805140, "datetime": "2022-11-18 15:59:00" }
will be represented as seven columns, with names starting “msft_”.
I left the “datetime” value named “datetime” in both parsers, as that makes the join easier.
Again, I needed to map the values identied in the JSON schema (shown on the left) to the output columns I defined (shown on the right).
That left my Hierarchical data node with these output columns.
Now I had structured data flowing from events on both the STOCK.PRICES.APPLE
and STOCK.PRICES.MICROSOFT
topics. It was time to merge them into a single stream of events.
I used the datetime
value as the Join key for this – as I called the column holding the datetime value in both aapl_cols and msft_cols output “datetime”.
I used an Inner join for this because I’m looking for events where the same timestamp is found in an event on both the Apple and Microsoft updates topic.
You can see the input columns for the Join node, split into separate lists for each of the two inputs (“aapl_cols” and “msft_cols”).
For the output columns, I basically wanted everything. So (as well as the “datetime” column I used for the join) I included all of the “appl_” columns…
and all of the “msft_” columns.
I called this “all_cols” to make it clear that this was structured, columnar data that includes columns for both the Apple and Microsoft data.
Next I needed to turn this columnar data into a JSON string, so I used another Hierarchical data node.
The input columns were all of the joined, combined columns that came out of the Join node.
I kept the assembly for this nice and simple, with just a single JSON composer.
The config for the JSON composer is a little hard to read, as it gets greyed out once you confirm it, but the main setting was specifying the schema to use – which I chose from the “output messages” schema library that I prepared before.
Remember that the JSON payload I invented for the joined up events looks like this:
{ "timestamp": 1668805140, "datetime": "2022-11-18 15:59:00", "apple": { "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044 }, "microsoft": { "open": 147.4, "high": 147.58, "low": 147.4, "close": 147.57, "volume": 56044 } }
To configure the JSON composer, I needed to map each of the “aapl_” and “msft_” columns to the values in my JSON structure.
As I wanted to give the Kafka producer node a single string to write, I defined a single output column. I used a string serializer for writing to the output topic, so I made the output column a VARCHAR with a generous max length to hold the combined output. I called this column “OUTPUT_STRING” to make it obvious what it would contain.
I configured the overall Hierarchical data assembly by mapping the string output of the JSON composer to my “OUTPUT_STRING” output column.
I named the output link for the Hierarchical data node “all_str”, to show that it was the combined stock price events for all shares, in a single string representation.
The output column going down this link is the single “OUTPUT_STRING” VARCHAR column.
The final node in the flow is another Apache Kafka node, this time being used as a Kafka producer.
I chose the “Event Streams” Connection that I created before, and told it to produce to a new STOCK.PRICES.ALL
Kafka topic.
The input for this node was the single string coming from the JSON composer, which was used as the message payload for messages produced to the topic.
And that was the whole flow.
To recap, the flow:
- consumed string messages from two of my stock price Kafka topics
- converted the JSON string messages to structured columnar data
- joined the two sets of data using the datetime value in each message
- flattened the joined columnar data back into a JSON string
- produced the JSON string messages to a new Kafka topic
I tested the flow by hitting the Run button.
Because I didn’t tick the Continuous mode option in the Kafka consumer nodes, the flow ran through the messages on the topic until it didn’t see any new events for 30 seconds.
To confirm, I checked that a STOCK.PRICES.ALL
topic was created in the Event Streams web interface, and that the messages on the topic all had combined share price events for each timestamp.
Tags: apachekafka, eventstreams, ibmeventstreams, kafka