SQL queries on Kafka topics using Apache Hive

Apache Hive is open source data warehouse software built on top of Hadoop. It gives you an SQL-like interface to a wide variety of databases, filesystems, and other systems.

One of the Hive storage handlers is a Kafka storage handler, which lets you create a Hive “external table” based on a Kafka topic.

And once you’ve created a Hive table based on a Kafka topic, you can run SQL queries based on attributes of the messages on that topic.

I was having a play with Hive this evening, as a way of running SQL queries against messages on my Kafka topics. In this post, I’ll share a few queries that I tried.


Setting up Hive

If you want to run Hive properly, follow the Getting Started instructions in the Hive documentation.

But if you just want to give it a quick try:

Get Hadoop

  1. Download a release of Hadoop
  2. Unzip
  3. export HADOOP_HOME=/location/you/unzipped/hadoop

Get Hive

  1. Download a release of Hive
  2. Unzip
  3. export HIVE_HOME=/location/you/unzipped/hive
  4. Download the kafka-handler jar and save it in $HIVE_HOME/lib/

Setup Hive

Prepare folders for use by Hadoop.

$ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse

Initialise the server.

$HIVE_HOME/bin/schematool -dbType derby -initSchema

Run Hive

For quick development/testing purposes, running the Hive server and the CLI all in a single process is the simplest:

$HIVE_HOME/bin/beeline -u jdbc:hive2://

Prepare a Kafka topic

I’m using IBM Event Streams because I work on it, but you could use plain Apache Kafka.

First, I created a topic DALE.TOPIC, and I produced five quick test messages to it.


Click to enlarge

Each message is a JSON string, with a key called message and a different value.

{"message":"Hello World"}
{"message":"This is a test"}
{"message":"These are messages"}
{"message":"Lorem Ipsum"}
{"message":"Lorem ipsum dolor sit amet"}

Next, I downloaded a Java truststore, and created an API key, to let Hive make a secure connection to my Kafka cluster.


Click to enlarge

Then, I got the set of connection properties needed to connect an application to my topic.


Click to enlarge

That was enough info to create the external table in Hive, using Hive’s beehive CLI:

Connected to: Apache Hive (version 3.1.1)
Driver: Hive JDBC (version 3.1.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.1 by Apache Hive
0: jdbc:hive2://> CREATE EXTERNAL TABLE test_kafka
. . . . . . . . > (`message` string)
. . . . . . . . > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
. . . . . . . . > TBLPROPERTIES
. . . . . . . . > ("kafka.topic" = "DALE.TOPIC",
. . . . . . . . >  "kafka.bootstrap.servers"="9.30.245.82:32342",
. . . . . . . . >  "kafka.consumer.sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='token' password='9UfXe0KkabBqjXiU9NdGnvtEPVrqJ_tvLo-ZqnQy8Gww';",
. . . . . . . . >  "kafka.consumer.sasl.mechanism"="PLAIN",
. . . . . . . . >  "kafka.consumer.security.protocol"="SASL_SSL",
. . . . . . . . >  "kafka.consumer.ssl.protocol"="TLSv1.2",
. . . . . . . . >  "kafka.consumer.ssl.truststore.location"="/root/es-cert.jks",
. . . . . . . . >  "kafka.consumer.ssl.truststore.password"="password"
. . . . . . . . >  );
OK
No rows affected

Notice that the first two table properties I’ve used are kafka.topic with my topic name, and kafka.bootstrap.servers with the bootstrap address for my cluster.

The rest are the properties I copied from the Event Streams UI, prefixed with kafka.consumer. because I’m using SELECT queries to read from my topic.

Simple SQL queries

I can use a SELECT query to fetch my five messages from the topic.

0: jdbc:hive2://> SELECT * from test_kafka;
OK
+-----------------------------+-------------------+-------------------------+----------------------+-------------------------+
|     test_kafka.message      | test_kafka.__key  | test_kafka.__partition  | test_kafka.__offset  | test_kafka.__timestamp  |
+-----------------------------+-------------------+-------------------------+----------------------+-------------------------+
| Hello World                 |                   | 0                       | 0                    | 1565042407338           |
| This is a test              |                   | 1                       | 0                    | 1565042419083           |
| These are messages          |                   | 2                       | 0                    | 1565042429234           |
| Lorem Ipsum                 |                   | 0                       | 1                    | 1565042458894           |
| Lorem ipsum dolor sit amet  |                   | 1                       | 1                    | 1565042612579           |
+-----------------------------+-------------------+-------------------------+----------------------+-------------------------+
5 rows selected

Hive gives me the field I gave it for the table (the “message” string attribute that I put in every Kafka message), and metadata fields for the key, partition, offset and timestamp.

I can use SQL WHERE clauses to filter based on this metadata.

For example, to get messages between two timestamps:

0: jdbc:hive2://> SELECT * FROM test_kafka 
. . . . . . . . > WHERE 
. . . . . . . . >   `__timestamp` > 1565042420000 
. . . . . . . . > AND 
. . . . . . . . >   `__timestamp` < 1565042520000;
OK
+---------------------+-------------------+-------------------------+----------------------+-------------------------+
| test_kafka.message  | test_kafka.__key  | test_kafka.__partition  | test_kafka.__offset  | test_kafka.__timestamp  |
+---------------------+-------------------+-------------------------+----------------------+-------------------------+
| These are messages  |                   | 2                       | 2                    | 1565042429234           |
| Lorem Ipsum         |                   | 0                       | 1                    | 1565042458894           |
+---------------------+-------------------+-------------------------+----------------------+-------------------------+
2 rows selected

A more interesting Kafka topic

Those queries worked because I’d used the same field message in every Kafka message.

I’ve written before that a better way of ensuring consistency in Kafka messages is to use a schema.

And Avro schemas work well with Hive queries.

I created a schema to define messages about Mario games, and uploaded it to the Event Streams Schema Registry.


Click to enlarge

Each Kafka message on my topic will be about a Mario game, and have attributes with the name of the game, the year it was released, and the number of copies sold.

I created a new MARIO.TOPIC topic to use the schema with.

Then I produced messages to represent stats about a bunch of Mario games.


Click to enlarge

The Event Streams UI integrates with the Schema Registry, so if I click on a couple of the messages, you can see the parsed message fields.


Click to enlarge


Click to enlarge

Now I had some test data suitable for trying out more interesting queries with.

Setting up Hive to use Avro schemas

To use the schema, I needed to download a serdes jar that can fetch schemas on-demand from the Event Streams Schema Registry. I used the Event Streams UI to download the jars that I need.


Click to enlarge

I copied the Event Streams serdes jars to $HIVE_HOME/auxlib. (I needed to create that folder first, and then restart my Hive process).

And I needed to add a kafka.serdes.class table property when creating the external table in Hive.

Connected to: Apache Hive (version 3.1.1)
Driver: Hive JDBC (version 3.1.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.1 by Apache Hive
0: jdbc:hive2://> CREATE EXTERNAL TABLE mario_kafka
. . . . . . . . > (`title` string, `releaseYear` int, `salesInMillions` float)
. . . . . . . . > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
. . . . . . . . > TBLPROPERTIES
. . . . . . . . > ("kafka.topic" = "MARIO.TOPIC",
. . . . . . . . >  "kafka.bootstrap.servers"="9.30.245.82:32342",
. . . . . . . . >  "kafka.consumer.sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='token' password='9UfXe0KkabBqjXiU9NdGnvtEPVrqJ_tvLo-ZqnQy8Gww';",
. . . . . . . . >  "kafka.consumer.sasl.mechanism"="PLAIN",
. . . . . . . . >  "kafka.consumer.security.protocol"="SASL_SSL",
. . . . . . . . >  "kafka.consumer.ssl.protocol"="TLSv1.2",
. . . . . . . . >  "kafka.consumer.ssl.truststore.location"="/root/es-cert.jks",
. . . . . . . . >  "kafka.consumer.ssl.truststore.password"="password",
. . . . . . . . >  "kafka.serdes.class"="com.ibm.eventstreams.serdes.EventStreamsSerdes"
. . . . . . . . >  );
OK
No rows affected

SQL queries with filters

That means I can do queries like getting all Mario games from the last five years.

0: jdbc:hive2://> SELECT * FROM mario_kafka 
. . . . . . . . > WHERE 
. . . . . . . . >  `releaseYear` > 2013;
OK
+-------------------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
|          mario_kafka.title          | mario_kafka.releaseyear  | mario_kafka.salesinmillions  | mario_kafka.__key  | mario_kafka.__partition  | mario_kafka.__offset  | mario_kafka.__timestamp  |
+-------------------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
| Super Mario Odyssey                 | 2017                     | 14.44                        | NULL               | 0                        | 3                     | 1565041149793            |
| Mario Kart 8                        | 2014                     | 8.44                         | NULL               | 0                        | 7                     | 1565041149795            |
| Super Mario Maker                   | 2015                     | 4.0                          | NULL               | 0                        | 13                    | 1565041149798            |
| Mario Party 10                      | 2015                     | 2.21                         | NULL               | 0                        | 19                    | 1565041149800            |
| Super Mario Party                   | 2018                     | 6.4                          | NULL               | 2                        | 8                     | 1565041149796            |
| New Super Mario Bros. U Deluxe      | 2019                     | 3.31                         | NULL               | 2                        | 13                    | 1565041149798            |
| Mario Tennis Aces                   | 2018                     | 2.64                         | NULL               | 2                        | 15                    | 1565041149799            |
| Mario Kart 8 Deluxe (NS)            | 2017                     | 16.69                        | NULL               | 1                        | 3                     | 1565041149793            |
| Super Mario Maker for Nintendo 3DS  | 2016                     | 2.01                         | NULL               | 1                        | 21                    | 1565041149801            |
+-------------------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
9 rows selected

Or all Mario games that have sold more than 20 million copies.

0: jdbc:hive2://> SELECT * FROM mario_kafka
. . . . . . . . > WHERE
. . . . . . . . >  `salesInMillions` > 20;
OK
+----------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
|     mario_kafka.title      | mario_kafka.releaseyear  | mario_kafka.salesinmillions  | mario_kafka.__key  | mario_kafka.__partition  | mario_kafka.__offset  | mario_kafka.__timestamp  |
+----------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
| Mario Kart Wii             | 2008                     | 37.2                         | NULL               | 0                        | 0                     | 1565041149791            |
| Mario Kart DS              | 2005                     | 23.6                         | NULL               | 0                        | 1                     | 1565041149792            |
| New Super Mario Bros.      | 2006                     | 30.8                         | NULL               | 2                        | 0                     | 1565041149791            |
| Super Mario World          | 1990                     | 20.61                        | NULL               | 2                        | 1                     | 1565041149792            |
| Super Mario Bros           | 1985                     | 40.24                        | NULL               | 1                        | 0                     | 1565041149783            |
| New Super Mario Bros. Wii  | 2009                     | 30.26                        | NULL               | 1                        | 1                     | 1565041149791            |
+----------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
6 rows selected

SQL queries with aggregates

Or get the total number of games sold for Mario games grouped by the year of release.

Notice how Hive is handling this by setting up a map-reduce job to handle the SUM.

0: jdbc:hive2://> SELECT 
. . . . . . . . >   `releaseYear`, 
. . . . . . . . >   SUM(`salesInMillions`) AS totalSales 
. . . . . . . . > FROM mario_kafka 
. . . . . . . . >   GROUP BY `releaseYear`;
Query ID = root_20190805145028_58130aad-b433-4dbe-aed5-8b7fbfea57bb
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-05 14:50:31,181 Stage-1 map = 100%,  reduce = 0%
2019-08-05 14:50:33,194 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_local1519121693_0002
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
+--------------+---------------------+
| releaseyear  |     totalsales      |
+--------------+---------------------+
| 1983         | 2.2799999713897705  |
| 1985         | 40.2400016784668    |
| 1986         | 2.6500000953674316  |
| 1988         | 24.74000072479248   |
| 1989         | 18.139999389648438  |
| 1990         | 30.80000066757202   |
| 1992         | 22.250000476837158  |
| 1993         | 10.550000190734863  |
| 1994         | 5.190000057220459   |
| 1995         | 4.119999885559082   |
| 1996         | 23.919999837875366  |
| 1998         | 2.700000047683716   |
| 1999         | 9.020000219345093   |
| 2000         | 6.779999852180481   |
| 2001         | 17.170000076293945  |
| 2002         | 11.320000171661377  |
| 2003         | 17.759999990463257  |
| 2004         | 18.59000015258789   |
| 2005         | 28.050000429153442  |
| 2006         | 33.53999924659729   |
| 2007         | 35.00000071525574   |
| 2008         | 38.46000075340271   |
| 2009         | 34.390000343322754  |
| 2010         | 9.18999981880188    |
| 2011         | 34.540000200271606  |
| 2012         | 23.869999885559082  |
| 2013         | 13.190000057220459  |
| 2014         | 8.4399995803833     |
| 2015         | 6.210000038146973   |
| 2016         | 2.009999990463257   |
| 2017         | 31.130000114440918  |
| 2018         | 9.040000200271606   |
| 2019         | 3.309999942779541   |
+--------------+---------------------+
33 rows selected

If I count how many Mario games were released in each year, and sort that by the number of releases, this causes Hive to need a second map-reduce job.

0: jdbc:hive2://> SELECT 
. . . . . . . . >   `releaseYear`, 
. . . . . . . . >   COUNT(*) AS numReleases 
. . . . . . . . > FROM mario_kafka 
. . . . . . . . > GROUP BY `releaseYear` 
. . . . . . . . > ORDER BY numReleases DESC;
Query ID = root_20190805145411_dcf88f12-49ec-4a61-aea3-299fb9d964f6
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-05 14:54:14,191 Stage-1 map = 100%,  reduce = 0%
2019-08-05 14:54:16,197 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_local1092517259_0006
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-05 14:54:17,511 Stage-2 map = 100%,  reduce = 100%
Ended Job = job_local116294658_0007
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Stage-Stage-2:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
+--------------+--------------+
| releaseyear  | numreleases  |
+--------------+--------------+
| 2004         | 5            |
| 2003         | 5            |
| 2013         | 5            |
| 2007         | 5            |
| 2000         | 4            |
| 2012         | 4            |
| 2011         | 4            |
| 2005         | 4            |
| 1999         | 3            |
| 2006         | 3            |
| 1992         | 3            |
| 1990         | 3            |
| 1996         | 3            |
| 2002         | 3            |
| 2001         | 3            |
| 2008         | 2            |
| 2015         | 2            |
| 2017         | 2            |
| 1988         | 2            |
| 2010         | 2            |
| 2009         | 2            |
| 2018         | 2            |
| 2019         | 1            |
| 2016         | 1            |
| 2014         | 1            |
| 1998         | 1            |
| 1995         | 1            |
| 1994         | 1            |
| 1993         | 1            |
| 1989         | 1            |
| 1986         | 1            |
| 1985         | 1            |
| 1983         | 1            |
+--------------+--------------+
33 rows selected

Creating views

I can also create a view to make it easier to refer to subsets of the messages on my Kafka topic.

0: jdbc:hive2://> SELECT * from mario_kafka 
. . . . . . . . > WHERE `title` LIKE '%Party%';
OK
+---------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
|     mario_kafka.title     | mario_kafka.releaseyear  | mario_kafka.salesinmillions  | mario_kafka.__key  | mario_kafka.__partition  | mario_kafka.__offset  | mario_kafka.__timestamp  |
+---------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
| Mario Party DS            | 2007                     | 9.31                         | NULL               | 0                        | 6                     | 1565041149795            |
| Mario Party 10            | 2015                     | 2.21                         | NULL               | 0                        | 19                    | 1565041149800            |
| Mario Party 7             | 2005                     | 1.86                         | NULL               | 0                        | 21                    | 1565041149801            |
| Mario Party 8             | 2007                     | 8.85                         | NULL               | 2                        | 6                     | 1565041149795            |
| Super Mario Party         | 2018                     | 6.4                          | NULL               | 2                        | 8                     | 1565041149796            |
| Mario Party 9             | 2012                     | 2.73                         | NULL               | 2                        | 14                    | 1565041149798            |
| Mario Party 4             | 2002                     | 2.21                         | NULL               | 2                        | 17                    | 1565041149800            |
| Mario Party 5             | 2003                     | 2.0                          | NULL               | 2                        | 19                    | 1565041149800            |
| Mario Party 3             | 2000                     | 1.91                         | NULL               | 2                        | 21                    | 1565041149801            |
| Mario Party 6             | 2004                     | 1.65                         | NULL               | 2                        | 22                    | 1565041149801            |
| Mario Party: Island Tour  | 2013                     | 1.14                         | NULL               | 2                        | 26                    | 1565041149803            |
| Mario Party               | 1998                     | 2.7                          | NULL               | 1                        | 15                    | 1565041149799            |
| Mario Party 2             | 1999                     | 2.48                         | NULL               | 1                        | 16                    | 1565041149799            |
+---------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+
13 rows selected
0: jdbc:hive2://>
0: jdbc:hive2://>
0: jdbc:hive2://> CREATE VIEW mario_party_games
. . . . . . . . > AS 
. . . . . . . . >   SELECT `title`, `releaseYear`, `salesInMillions`
. . . . . . . . >   ADDED FROM mario_kafka
. . . . . . . . >   WHERE `title` LIKE '%Party%';
OK
No rows affected
0: jdbc:hive2://>
0: jdbc:hive2://>  SELECT * FROM mario_party_games;
OK
+---------------------------+--------------------------------+--------------------------+
|  mario_party_games.title  | mario_party_games.releaseyear  | mario_party_games.added  |
+---------------------------+--------------------------------+--------------------------+
| Mario Party DS            | 2007                           | 9.31                     |
| Mario Party 10            | 2015                           | 2.21                     |
| Mario Party 7             | 2005                           | 1.86                     |
| Mario Party 8             | 2007                           | 8.85                     |
| Super Mario Party         | 2018                           | 6.4                      |
| Mario Party 9             | 2012                           | 2.73                     |
| Mario Party 4             | 2002                           | 2.21                     |
| Mario Party 5             | 2003                           | 2.0                      |
| Mario Party 3             | 2000                           | 1.91                     |
| Mario Party 6             | 2004                           | 1.65                     |
| Mario Party: Island Tour  | 2013                           | 1.14                     |
| Mario Party               | 1998                           | 2.7                      |
| Mario Party 2             | 1999                           | 2.48                     |
+---------------------------+--------------------------------+--------------------------+
13 rows selected
0: jdbc:hive2://>
0: jdbc:hive2://>
0: jdbc:hive2://>
0: jdbc:hive2://> SELECT * FROM mario_party_games 
. . . . . . . . > ORDER BY `releaseYear`;
Query ID = root_20190805152257_eb7a4e36-36b4-4e1d-86c2-9bbf5999115b
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-08-05 15:23:00,367 Stage-1 map = 100%,  reduce = 0%
2019-08-05 15:23:02,373 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_local1254056661_0008
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
+---------------------------+--------------------------------+--------------------------+
|  mario_party_games.title  | mario_party_games.releaseyear  | mario_party_games.added  |
+---------------------------+--------------------------------+--------------------------+
| Mario Party               | 1998                           | 2.7                      |
| Mario Party 2             | 1999                           | 2.48                     |
| Mario Party 3             | 2000                           | 1.91                     |
| Mario Party 4             | 2002                           | 2.21                     |
| Mario Party 5             | 2003                           | 2.0                      |
| Mario Party 6             | 2004                           | 1.65                     |
| Mario Party 7             | 2005                           | 1.86                     |
| Mario Party DS            | 2007                           | 9.31                     |
| Mario Party 8             | 2007                           | 8.85                     |
| Mario Party 9             | 2012                           | 2.73                     |
| Mario Party: Island Tour  | 2013                           | 1.14                     |
| Mario Party 10            | 2015                           | 2.21                     |
| Super Mario Party         | 2018                           | 6.4                      |
+---------------------------+--------------------------------+--------------------------+
13 rows selected
0: jdbc:hive2://>

Next steps

There are lots more things that Hive can do with Kafka. For example, I’ve only been using it as a consumer and haven’t even started getting Hive queries to produce to my topics.

But this was a good first intro to the kinds of thing that Hive can enable.

Tags: , , , , ,

Comments are closed.