Apache Hive is open source data warehouse software built on top of Hadoop. It gives you an SQL-like interface to a wide variety of databases, filesystems, and other systems.
One of the Hive storage handlers is a Kafka storage handler, which lets you create a Hive “external table” based on a Kafka topic.
And once you’ve created a Hive table based on a Kafka topic, you can run SQL queries based on attributes of the messages on that topic.
I was having a play with Hive this evening, as a way of running SQL queries against messages on my Kafka topics. In this post, I’ll share a few queries that I tried.
Setting up Hive
If you want to run Hive properly, follow the Getting Started instructions in the Hive documentation.
But if you just want to give it a quick try:
Get Hadoop
- Download a release of Hadoop
- Unzip
export HADOOP_HOME=/location/you/unzipped/hadoop
Get Hive
- Download a release of Hive
- Unzip
export HIVE_HOME=/location/you/unzipped/hive
- Download the kafka-handler jar and save it in
$HIVE_HOME/lib/
Setup Hive
Prepare folders for use by Hadoop.
$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp $ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse
Initialise the server.
$HIVE_HOME/bin/schematool -dbType derby -initSchema
Run Hive
For quick development/testing purposes, running the Hive server and the CLI all in a single process is the simplest:
$HIVE_HOME/bin/beeline -u jdbc:hive2://
Prepare a Kafka topic
I’m using IBM Event Streams because I work on it, but you could use plain Apache Kafka.
First, I created a topic DALE.TOPIC
, and I produced five quick test messages to it.
Each message is a JSON string, with a key called message
and a different value.
{"message":"Hello World"} {"message":"This is a test"} {"message":"These are messages"} {"message":"Lorem Ipsum"} {"message":"Lorem ipsum dolor sit amet"}
Next, I downloaded a Java truststore, and created an API key, to let Hive make a secure connection to my Kafka cluster.
Then, I got the set of connection properties needed to connect an application to my topic.
That was enough info to create the external table in Hive, using Hive’s beehive
CLI:
Connected to: Apache Hive (version 3.1.1) Driver: Hive JDBC (version 3.1.1) Transaction isolation: TRANSACTION_REPEATABLE_READ Beeline version 3.1.1 by Apache Hive 0: jdbc:hive2://> CREATE EXTERNAL TABLE test_kafka . . . . . . . . > (`message` string) . . . . . . . . > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' . . . . . . . . > TBLPROPERTIES . . . . . . . . > ("kafka.topic" = "DALE.TOPIC", . . . . . . . . > "kafka.bootstrap.servers"="9.30.245.82:32342", . . . . . . . . > "kafka.consumer.sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='token' password='9UfXe0KkabBqjXiU9NdGnvtEPVrqJ_tvLo-ZqnQy8Gww';", . . . . . . . . > "kafka.consumer.sasl.mechanism"="PLAIN", . . . . . . . . > "kafka.consumer.security.protocol"="SASL_SSL", . . . . . . . . > "kafka.consumer.ssl.protocol"="TLSv1.2", . . . . . . . . > "kafka.consumer.ssl.truststore.location"="/root/es-cert.jks", . . . . . . . . > "kafka.consumer.ssl.truststore.password"="password" . . . . . . . . > ); OK No rows affected
Notice that the first two table properties I’ve used are kafka.topic
with my topic name, and kafka.bootstrap.servers
with the bootstrap address for my cluster.
The rest are the properties I copied from the Event Streams UI, prefixed with kafka.consumer.
because I’m using SELECT
queries to read from my topic.
Simple SQL queries
I can use a SELECT
query to fetch my five messages from the topic.
0: jdbc:hive2://> SELECT * from test_kafka; OK +-----------------------------+-------------------+-------------------------+----------------------+-------------------------+ | test_kafka.message | test_kafka.__key | test_kafka.__partition | test_kafka.__offset | test_kafka.__timestamp | +-----------------------------+-------------------+-------------------------+----------------------+-------------------------+ | Hello World | | 0 | 0 | 1565042407338 | | This is a test | | 1 | 0 | 1565042419083 | | These are messages | | 2 | 0 | 1565042429234 | | Lorem Ipsum | | 0 | 1 | 1565042458894 | | Lorem ipsum dolor sit amet | | 1 | 1 | 1565042612579 | +-----------------------------+-------------------+-------------------------+----------------------+-------------------------+ 5 rows selected
Hive gives me the field I gave it for the table (the “message” string attribute that I put in every Kafka message), and metadata fields for the key, partition, offset and timestamp.
I can use SQL WHERE
clauses to filter based on this metadata.
For example, to get messages between two timestamps:
0: jdbc:hive2://> SELECT * FROM test_kafka . . . . . . . . > WHERE . . . . . . . . > `__timestamp` > 1565042420000 . . . . . . . . > AND . . . . . . . . > `__timestamp` < 1565042520000; OK +---------------------+-------------------+-------------------------+----------------------+-------------------------+ | test_kafka.message | test_kafka.__key | test_kafka.__partition | test_kafka.__offset | test_kafka.__timestamp | +---------------------+-------------------+-------------------------+----------------------+-------------------------+ | These are messages | | 2 | 2 | 1565042429234 | | Lorem Ipsum | | 0 | 1 | 1565042458894 | +---------------------+-------------------+-------------------------+----------------------+-------------------------+ 2 rows selected
A more interesting Kafka topic
Those queries worked because I’d used the same field message
in every Kafka message.
I’ve written before that a better way of ensuring consistency in Kafka messages is to use a schema.
And Avro schemas work well with Hive queries.
I created a schema to define messages about Mario games, and uploaded it to the Event Streams Schema Registry.
Each Kafka message on my topic will be about a Mario game, and have attributes with the name of the game, the year it was released, and the number of copies sold.
I created a new MARIO.TOPIC
topic to use the schema with.
Then I produced messages to represent stats about a bunch of Mario games.
The Event Streams UI integrates with the Schema Registry, so if I click on a couple of the messages, you can see the parsed message fields.
Now I had some test data suitable for trying out more interesting queries with.
Setting up Hive to use Avro schemas
To use the schema, I needed to download a serdes jar that can fetch schemas on-demand from the Event Streams Schema Registry. I used the Event Streams UI to download the jars that I need.
I copied the Event Streams serdes jars to $HIVE_HOME/auxlib
. (I needed to create that folder first, and then restart my Hive process).
And I needed to add a kafka.serdes.class
table property when creating the external table in Hive.
Connected to: Apache Hive (version 3.1.1) Driver: Hive JDBC (version 3.1.1) Transaction isolation: TRANSACTION_REPEATABLE_READ Beeline version 3.1.1 by Apache Hive 0: jdbc:hive2://> CREATE EXTERNAL TABLE mario_kafka . . . . . . . . > (`title` string, `releaseYear` int, `salesInMillions` float) . . . . . . . . > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' . . . . . . . . > TBLPROPERTIES . . . . . . . . > ("kafka.topic" = "MARIO.TOPIC", . . . . . . . . > "kafka.bootstrap.servers"="9.30.245.82:32342", . . . . . . . . > "kafka.consumer.sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='token' password='9UfXe0KkabBqjXiU9NdGnvtEPVrqJ_tvLo-ZqnQy8Gww';", . . . . . . . . > "kafka.consumer.sasl.mechanism"="PLAIN", . . . . . . . . > "kafka.consumer.security.protocol"="SASL_SSL", . . . . . . . . > "kafka.consumer.ssl.protocol"="TLSv1.2", . . . . . . . . > "kafka.consumer.ssl.truststore.location"="/root/es-cert.jks", . . . . . . . . > "kafka.consumer.ssl.truststore.password"="password", . . . . . . . . > "kafka.serdes.class"="com.ibm.eventstreams.serdes.EventStreamsSerdes" . . . . . . . . > ); OK No rows affected
SQL queries with filters
That means I can do queries like getting all Mario games from the last five years.
0: jdbc:hive2://> SELECT * FROM mario_kafka . . . . . . . . > WHERE . . . . . . . . > `releaseYear` > 2013; OK +-------------------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ | mario_kafka.title | mario_kafka.releaseyear | mario_kafka.salesinmillions | mario_kafka.__key | mario_kafka.__partition | mario_kafka.__offset | mario_kafka.__timestamp | +-------------------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ | Super Mario Odyssey | 2017 | 14.44 | NULL | 0 | 3 | 1565041149793 | | Mario Kart 8 | 2014 | 8.44 | NULL | 0 | 7 | 1565041149795 | | Super Mario Maker | 2015 | 4.0 | NULL | 0 | 13 | 1565041149798 | | Mario Party 10 | 2015 | 2.21 | NULL | 0 | 19 | 1565041149800 | | Super Mario Party | 2018 | 6.4 | NULL | 2 | 8 | 1565041149796 | | New Super Mario Bros. U Deluxe | 2019 | 3.31 | NULL | 2 | 13 | 1565041149798 | | Mario Tennis Aces | 2018 | 2.64 | NULL | 2 | 15 | 1565041149799 | | Mario Kart 8 Deluxe (NS) | 2017 | 16.69 | NULL | 1 | 3 | 1565041149793 | | Super Mario Maker for Nintendo 3DS | 2016 | 2.01 | NULL | 1 | 21 | 1565041149801 | +-------------------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ 9 rows selected
Or all Mario games that have sold more than 20 million copies.
0: jdbc:hive2://> SELECT * FROM mario_kafka . . . . . . . . > WHERE . . . . . . . . > `salesInMillions` > 20; OK +----------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ | mario_kafka.title | mario_kafka.releaseyear | mario_kafka.salesinmillions | mario_kafka.__key | mario_kafka.__partition | mario_kafka.__offset | mario_kafka.__timestamp | +----------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ | Mario Kart Wii | 2008 | 37.2 | NULL | 0 | 0 | 1565041149791 | | Mario Kart DS | 2005 | 23.6 | NULL | 0 | 1 | 1565041149792 | | New Super Mario Bros. | 2006 | 30.8 | NULL | 2 | 0 | 1565041149791 | | Super Mario World | 1990 | 20.61 | NULL | 2 | 1 | 1565041149792 | | Super Mario Bros | 1985 | 40.24 | NULL | 1 | 0 | 1565041149783 | | New Super Mario Bros. Wii | 2009 | 30.26 | NULL | 1 | 1 | 1565041149791 | +----------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ 6 rows selected
SQL queries with aggregates
Or get the total number of games sold for Mario games grouped by the year of release.
Notice how Hive is handling this by setting up a map-reduce job to handle the SUM
.
0: jdbc:hive2://> SELECT . . . . . . . . > `releaseYear`, . . . . . . . . > SUM(`salesInMillions`) AS totalSales . . . . . . . . > FROM mario_kafka . . . . . . . . > GROUP BY `releaseYear`; Query ID = root_20190805145028_58130aad-b433-4dbe-aed5-8b7fbfea57bb Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Job running in-process (local Hadoop) 2019-08-05 14:50:31,181 Stage-1 map = 100%, reduce = 0% 2019-08-05 14:50:33,194 Stage-1 map = 100%, reduce = 100% Ended Job = job_local1519121693_0002 MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK +--------------+---------------------+ | releaseyear | totalsales | +--------------+---------------------+ | 1983 | 2.2799999713897705 | | 1985 | 40.2400016784668 | | 1986 | 2.6500000953674316 | | 1988 | 24.74000072479248 | | 1989 | 18.139999389648438 | | 1990 | 30.80000066757202 | | 1992 | 22.250000476837158 | | 1993 | 10.550000190734863 | | 1994 | 5.190000057220459 | | 1995 | 4.119999885559082 | | 1996 | 23.919999837875366 | | 1998 | 2.700000047683716 | | 1999 | 9.020000219345093 | | 2000 | 6.779999852180481 | | 2001 | 17.170000076293945 | | 2002 | 11.320000171661377 | | 2003 | 17.759999990463257 | | 2004 | 18.59000015258789 | | 2005 | 28.050000429153442 | | 2006 | 33.53999924659729 | | 2007 | 35.00000071525574 | | 2008 | 38.46000075340271 | | 2009 | 34.390000343322754 | | 2010 | 9.18999981880188 | | 2011 | 34.540000200271606 | | 2012 | 23.869999885559082 | | 2013 | 13.190000057220459 | | 2014 | 8.4399995803833 | | 2015 | 6.210000038146973 | | 2016 | 2.009999990463257 | | 2017 | 31.130000114440918 | | 2018 | 9.040000200271606 | | 2019 | 3.309999942779541 | +--------------+---------------------+ 33 rows selected
If I count how many Mario games were released in each year, and sort that by the number of releases, this causes Hive to need a second map-reduce job.
0: jdbc:hive2://> SELECT . . . . . . . . > `releaseYear`, . . . . . . . . > COUNT(*) AS numReleases . . . . . . . . > FROM mario_kafka . . . . . . . . > GROUP BY `releaseYear` . . . . . . . . > ORDER BY numReleases DESC; Query ID = root_20190805145411_dcf88f12-49ec-4a61-aea3-299fb9d964f6 Total jobs = 2 Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Job running in-process (local Hadoop) 2019-08-05 14:54:14,191 Stage-1 map = 100%, reduce = 0% 2019-08-05 14:54:16,197 Stage-1 map = 100%, reduce = 100% Ended Job = job_local1092517259_0006 Launching Job 2 out of 2 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Job running in-process (local Hadoop) 2019-08-05 14:54:17,511 Stage-2 map = 100%, reduce = 100% Ended Job = job_local116294658_0007 MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Stage-Stage-2: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK +--------------+--------------+ | releaseyear | numreleases | +--------------+--------------+ | 2004 | 5 | | 2003 | 5 | | 2013 | 5 | | 2007 | 5 | | 2000 | 4 | | 2012 | 4 | | 2011 | 4 | | 2005 | 4 | | 1999 | 3 | | 2006 | 3 | | 1992 | 3 | | 1990 | 3 | | 1996 | 3 | | 2002 | 3 | | 2001 | 3 | | 2008 | 2 | | 2015 | 2 | | 2017 | 2 | | 1988 | 2 | | 2010 | 2 | | 2009 | 2 | | 2018 | 2 | | 2019 | 1 | | 2016 | 1 | | 2014 | 1 | | 1998 | 1 | | 1995 | 1 | | 1994 | 1 | | 1993 | 1 | | 1989 | 1 | | 1986 | 1 | | 1985 | 1 | | 1983 | 1 | +--------------+--------------+ 33 rows selected
Creating views
I can also create a view to make it easier to refer to subsets of the messages on my Kafka topic.
0: jdbc:hive2://> SELECT * from mario_kafka . . . . . . . . > WHERE `title` LIKE '%Party%'; OK +---------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ | mario_kafka.title | mario_kafka.releaseyear | mario_kafka.salesinmillions | mario_kafka.__key | mario_kafka.__partition | mario_kafka.__offset | mario_kafka.__timestamp | +---------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ | Mario Party DS | 2007 | 9.31 | NULL | 0 | 6 | 1565041149795 | | Mario Party 10 | 2015 | 2.21 | NULL | 0 | 19 | 1565041149800 | | Mario Party 7 | 2005 | 1.86 | NULL | 0 | 21 | 1565041149801 | | Mario Party 8 | 2007 | 8.85 | NULL | 2 | 6 | 1565041149795 | | Super Mario Party | 2018 | 6.4 | NULL | 2 | 8 | 1565041149796 | | Mario Party 9 | 2012 | 2.73 | NULL | 2 | 14 | 1565041149798 | | Mario Party 4 | 2002 | 2.21 | NULL | 2 | 17 | 1565041149800 | | Mario Party 5 | 2003 | 2.0 | NULL | 2 | 19 | 1565041149800 | | Mario Party 3 | 2000 | 1.91 | NULL | 2 | 21 | 1565041149801 | | Mario Party 6 | 2004 | 1.65 | NULL | 2 | 22 | 1565041149801 | | Mario Party: Island Tour | 2013 | 1.14 | NULL | 2 | 26 | 1565041149803 | | Mario Party | 1998 | 2.7 | NULL | 1 | 15 | 1565041149799 | | Mario Party 2 | 1999 | 2.48 | NULL | 1 | 16 | 1565041149799 | +---------------------------+--------------------------+------------------------------+--------------------+--------------------------+-----------------------+--------------------------+ 13 rows selected 0: jdbc:hive2://> 0: jdbc:hive2://> 0: jdbc:hive2://> CREATE VIEW mario_party_games . . . . . . . . > AS . . . . . . . . > SELECT `title`, `releaseYear`, `salesInMillions` . . . . . . . . > ADDED FROM mario_kafka . . . . . . . . > WHERE `title` LIKE '%Party%'; OK No rows affected 0: jdbc:hive2://> 0: jdbc:hive2://> SELECT * FROM mario_party_games; OK +---------------------------+--------------------------------+--------------------------+ | mario_party_games.title | mario_party_games.releaseyear | mario_party_games.added | +---------------------------+--------------------------------+--------------------------+ | Mario Party DS | 2007 | 9.31 | | Mario Party 10 | 2015 | 2.21 | | Mario Party 7 | 2005 | 1.86 | | Mario Party 8 | 2007 | 8.85 | | Super Mario Party | 2018 | 6.4 | | Mario Party 9 | 2012 | 2.73 | | Mario Party 4 | 2002 | 2.21 | | Mario Party 5 | 2003 | 2.0 | | Mario Party 3 | 2000 | 1.91 | | Mario Party 6 | 2004 | 1.65 | | Mario Party: Island Tour | 2013 | 1.14 | | Mario Party | 1998 | 2.7 | | Mario Party 2 | 1999 | 2.48 | +---------------------------+--------------------------------+--------------------------+ 13 rows selected 0: jdbc:hive2://> 0: jdbc:hive2://> 0: jdbc:hive2://> 0: jdbc:hive2://> SELECT * FROM mario_party_games . . . . . . . . > ORDER BY `releaseYear`; Query ID = root_20190805152257_eb7a4e36-36b4-4e1d-86c2-9bbf5999115b Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Job running in-process (local Hadoop) 2019-08-05 15:23:00,367 Stage-1 map = 100%, reduce = 0% 2019-08-05 15:23:02,373 Stage-1 map = 100%, reduce = 100% Ended Job = job_local1254056661_0008 MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK +---------------------------+--------------------------------+--------------------------+ | mario_party_games.title | mario_party_games.releaseyear | mario_party_games.added | +---------------------------+--------------------------------+--------------------------+ | Mario Party | 1998 | 2.7 | | Mario Party 2 | 1999 | 2.48 | | Mario Party 3 | 2000 | 1.91 | | Mario Party 4 | 2002 | 2.21 | | Mario Party 5 | 2003 | 2.0 | | Mario Party 6 | 2004 | 1.65 | | Mario Party 7 | 2005 | 1.86 | | Mario Party DS | 2007 | 9.31 | | Mario Party 8 | 2007 | 8.85 | | Mario Party 9 | 2012 | 2.73 | | Mario Party: Island Tour | 2013 | 1.14 | | Mario Party 10 | 2015 | 2.21 | | Super Mario Party | 2018 | 6.4 | +---------------------------+--------------------------------+--------------------------+ 13 rows selected 0: jdbc:hive2://>
Next steps
There are lots more things that Hive can do with Kafka. For example, I’ve only been using it as a consumer and haven’t even started getting Hive queries to produce to my topics.
But this was a good first intro to the kinds of thing that Hive can enable.
Tags: apachehive, apachekafka, eventstreams, hive, ibmeventstreams, kafka