This post will go into detail how we interact with Kafka using Python while enforcing schema. It will start by introducing you to schema, acritical concepts working with Kafka that developers new to Kafka often overlook. Producing messages that conforms to a schema and decoding them can be frustrating to get right so I hope this post will help anyone who uses Python to talk to Kafka
Kafka schema and the need for one
Kafka can accept any form of data be it json, csv or binary but it doesn’t mean that you should send it data in any format. We know that Kafka excels in moving data between silos, often managed by different teams with different technologies. If your team sends data to Kafka as csv, any teams that subscribe to the topic would need to know the order of columns and what type each column has. That knowledge is essentially the schema of the data but it is not stored in the data itself. It’s a tribal knowledge, scattered in code and in someone’s head. Sooner or later, your team need to make change to the schema and all the code that subscribe to your topic will have to change along with it. Therefore it is critical to have schema for Kafka topic that can evolve safely overtime
Confluent & schema registry
If you are sold on schema, you must be disappointed if I tell you that Kafka doesn’t support it out of the box. Fear not because Confluent to the rescue! Kafka original authors founded Confluent to commercialize Kafka. It’s basically Kafka and other software that makes working with Kafka a lot better. And one of those software is schema registry
Schema registry is an open-source web service that serves as a central repository for all schemas. Every schema is versioned and when updating the schema, you can specify the compatibility requirements. They are:
Full: Both backward and forward compatible
Astute readers is probably asking how to enforce a schema to a topic because like I said above, Kafka doesn’t enforce a schema by default. Confluent comes with producer class that will look up the latest version of a schema based on the topic name and encode the message based on that schema. It also stores the ID of the schema in the message. This is so that schema-aware consumer can find the correct schema and decode the message.
Confluent uses Avro for schema. Avro is a data serialization format which can turn JSON to and from a binary string. Here is an example schema
During encoding a message, it is typical to include the schema along with the actual data. The decode would always have access to the schema and can decode any message. If that seems wasteful, the folks from Confluent definitely agree with you. If you use Confluent encoder class, it’d store the id of the schema in the message. This is possible because the consumer should also have access to schema registry and can look up the exact schema that the producer used.
Working with Kafka & schema registry in Python
Because Confluent platform is written in Java, if you use any other language, you are on your own if you want to use Confluent schema registry. The rest of this post will show you how to produce an Avro-encoded Kafka messages using Python. Another requirement is other Confluent Avro consumers should be able to decode our messages otherwise it won’t be much use
Structure of an Avro-encoded Kafka message
The first byte is a magic byte and it’s 0. You need to provide it or consumer will immediately return an error.
Schema id in big endian integer. For example, if we have 1 as schema id, we would have 4 bytes 00 00 00 01 from lowest memory to highest memory address
Avro encoded message: Given a schema id, you should know the full schema thanks to schema registry. Use the full schema and an avro encoder to obtain an encoded message in binary. Append it to the end of the message and that’s all you need to send messages that conform to Confluent format.
We’ll walk through how to setup a schema in schema registry and how to use it to encode Kafka message in python
Create schema in schema registry
Confluent schema registry comes with a REST api that make it easy to create and update schemas. Its main resource subject. Each subject then have multiple versions. To register a new subject and an initial version, send the following request to schema registry. Because the schema is in JSON, you’d have to escape it first.
When producing message to a topic, we usually encode using the latest schema available for that topic. find_latest_schema function will find the latest schema given a topic name using schema registry REST API
Confluent use the following convention to get schema name from topic name:
This if for schema to encode the actual message. For example, topic app-events will have its value schema stored under app-events-value subject
After obtaining the schema, the rest of the code focus on encoding the message using python’s avro library following the format that I outline above
If writing that much code just to get Python to work well with Kafka is not your cup of tea, you should use the excellent library from Verisign https://github.com/verisign/python-confluent-schemaregistry. I first thought it’s only a client to register schemas with schema registry. It turn out to be all you need to work with schema registry including encoding and decoding message.
I highly recommend using Confluent and its excellent open-source schema registry if you’re setting up Kafka for the first time. It will make your life much easier down the road. I spent a lot of time getting