I’ve written before about how to write a schema for your developers using Kafka. The examples I used before were all in Java, but someone asked me yesterday if I could share some Python equivalents.
The principles are described in the Event Streams documentation, but in short, your Kafka producers use Apache Avro to serialize the message data that you send, and identify the schema that you’ve used in the Kafka message header. In your Kafka consumers, you look at the headers of the messages that you receive to know which schema to retrieve, and use that to deserialize message data.
The interesting bit was how to do Avro serialization / deserialization in Python:
import io from avro.schema import Parse from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder # # Get the binary encoding for the provided object, using the # provided Avro schema definition. # # @param myschema - Avro schema object # @param myobject - Python object to be serialized # # @returns bytes representation of the binary-encoded serialized object def serialize(myschema, myobject): buf = io.BytesIO() encoder = BinaryEncoder(buf) writer = DatumWriter(writer_schema=myschema) writer.write(myobject, encoder) buf.seek(0) return (buf.read()) # # Extracts a Python object from a binary-encoded serialization # using the provided Avro schema definition. # # @param myschema - Avro schema object # @param mydata - bytes to be deserialized # # @returns Python object deserialized from the provided bytes def deserialize(myschema, mydata): buf = io.BytesIO(mydata) decoder = BinaryDecoder(buf) reader = DatumReader(writer_schema=myschema) return (reader.read(decoder))
I’ve shared a working sample consumer and producer app on Github so you can see how this all fits in in the context of a Python Kafka app. The README.md
tells you how to configure it to point to your Event Streams cluster, but other than that it’s ready to run as-is.
Tags: apacheavro, apachekafka, avro, eventstreams, ibmeventstreams, kafka