Producing avro-encoded Kafka messages in Python

This post will go into detail how we interact with Kafka using Python while enforcing schema. It will start by introducing you to schema, acritical concepts working with Kafka that developers new to Kafka often overlook. Producing messages that conforms to a schema and decoding them can be frustrating to get right so I hope this post will help anyone who uses Python to talk to Kafka

Kafka schema and the need for one

Kafka can accept any form of data be it json, csv or binary but it doesn’t mean that you should send it data in any format. We know that Kafka excels in moving data between silos, often managed by different teams with different technologies. If your team sends data to Kafka as csv, any teams that subscribe to the topic would need to know the order of columns and what type each column has. That knowledge is essentially the schema of the data but it is not stored in the data itself. It’s a tribal knowledge, scattered in code and in someone’s head. Sooner or later, your team need to make change to the schema and all the code that subscribe to your topic will have to change along with it. Therefore it is critical to have schema for Kafka topic that can evolve safely overtime

Confluent & schema registry

If you are sold on schema, you must be disappointed if I tell you that Kafka doesn’t support it out of the box. Fear not because Confluent to the rescue! Kafka original authors founded Confluent to commercialize Kafka. It’s basically Kafka and other software that makes working with Kafka a lot better. And one of those software is schema registry

Schema registry is an open-source web service that serves as a central repository for all schemas. Every schema is versioned and when updating the schema, you can specify the compatibility requirements. They are:

  1. Backward compatible
  2. Forward compatible
  3. Full: Both backward and forward compatible
  4. None

Astute readers is probably asking how to enforce a schema to a topic because like I said above, Kafka doesn’t enforce a schema by default. Confluent comes with producer class that will look up the latest version of a schema based on the topic name and encode the message based on that schema. It also stores the ID of the schema in the message. This is so that schema-aware consumer can find the correct schema and decode the message.

Avro schema

Confluent uses Avro for schema. Avro is a data serialization format which can turn JSON to and from a binary string. Here is an example schema

 

During encoding a message, it is typical to include the schema along with the actual data. The decode would always have access to the schema and can decode any message. If that seems wasteful, the folks from Confluent definitely agree with you. If you use Confluent encoder class, it’d store the id of the schema in the message. This is possible because the consumer should also have access to schema registry and can look up the exact schema that the producer used.

Working with Kafka & schema registry in Python

Because Confluent platform is written in Java, if you use any other language, you are on your own if you want to use Confluent schema registry. The rest of this post will show you how to produce an Avro-encoded Kafka messages using Python. Another requirement is other Confluent Avro consumers should be able to decode our messages otherwise it won’t be much use

Structure of an Avro-encoded Kafka message

Confluent compatible avro-encoded kafka message

 

  1. The first byte is a magic byte and it’s 0. You need to provide it or consumer will immediately return an error.
  2. Schema id in big endian integer. For example, if we have 1 as schema id, we would have 4 bytes 00 00 00 01 from lowest memory to highest memory address
  3. Avro encoded message: Given a schema id, you should know the full schema thanks to schema registry. Use the full schema and an avro encoder to obtain an encoded message in binary. Append it to the end of the message and that’s all you need to send messages that conform to Confluent format.

Tutorial

We’ll walk through how to setup a schema in schema registry and how to use it to encode Kafka message in python

Create schema in schema registry

Confluent schema registry comes with a REST api that make it easy to create and update schemas. Its main resource subject. Each subject then have multiple versions. To register a new subject and an initial version, send the following request to schema registry. Because the schema is in JSON, you’d have to escape it first.

Here I’m creating a schema for value in my-test-topic. Hence, I use my-test-topic-value as the subject name. This naming pattern is required by Confluent.

If successful, you should be able to check it by sending a GET request to http://schema.example.com:8081/subjects/my-test-topic-value/versions

If this is the new subject, tt should return only 1 version. You can get more detail by sending another GET to http://schema.example.com:8081/subjects/my-test-topic-value/versions/1

Set up the topic & avro console consumer

This is to verify that Confluent can decode our messages. To do this, I run the a avro-aware console consumer that Confluent includes in its platform.

  1. We first create a topic by running the following
  2.  After creating the topic, let listen to it

Time to produce some messages

The following code snippet will produce a avro-encoded message to Kafka in a format that Java or any other avro kafka consumer should be able to parse

When producing message to a topic, we usually encode using the latest schema available for that topic. find_latest_schema function will find the latest schema given a topic name using schema registry REST API

Confluent use the following convention to get schema name from topic name:

  • Value schema:

    This if for schema to encode the actual message. For example, topic app-events will have its value schema stored under app-events-value subject
  • Key schema

After obtaining the schema, the rest of the code focus on encoding the message using python’s avro library following the format that I outline above

Open-source library

If writing that much code just to get Python to work well with Kafka is not your cup of tea, you should use the excellent library from Verisign https://github.com/verisign/python-confluent-schemaregistry. I first thought it’s only a client to register schemas with schema registry. It turn out to be all you need to work with schema registry including encoding and decoding message.

Conclusion

I highly recommend using Confluent and its excellent open-source schema registry if you’re setting up Kafka for the first time. It will make your life much easier down the road. I spent a lot of time getting

0 replies

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *