In my first post, I’ve shown how simple Kafka is. Nevertheless, to be fast, durable and scalable, Kafka has made some design choices whose implications can be difficult for beginners to understand. This post aims to overview some kafka gotchas that I have come across and my approach in solving them
First kafka gotchas: Out of order
The first Kafka gotchas that almost everyone run into is out of order in messages delivery
In order to scale a topic out, it is necessary to divide it into multiple partitions. This is similar to database sharding. This creates some challenges especially in consuming messages from a Kafka topic.
Because we can only append to a partition of a kafka topic, Kafka guarantee that consumers will receive messages in order from the same partition. However, if a topic has multiple partitions, we can receive messages out of order.
Here a producer is writing to a topic with 2 partitions. The first message is A and the second is B. Because by default, producer writes to partitions in round robin fashion, partition 1 will receive A and partition 2 will receiv B
For consumers, Kafka uses pulling mechanism whereas consumers ask partitions for new data. Here, the offset the consumer have for both partitions are 0, so its ask both partitions for data starting at 0 offset
Thing can go awry if server hosting partition 1 is underload or just happen to receive the pulling message slightly after partition 2. In this case, our consumer will receive B from partition 2 first and A shortly after.
Solution for out of order delivery
Usually, we are not interested in the total ordering of messages, but only in the ordering of a subsets of them. For example, if the topic is update to user email and we want update Mailchimp with those new email, we are more concerned that email change of the same users get to Mailchimp in the correct order. We wouldn’t mind if email address of user B gets to Mailchimp before user A even though B did change his email address after A. How can we enforce such ordering?
Introducing keyed message. Up to this point, we only think of a message as an arbitrary piece of data. However, Kafka supports another type of message called keyed message. This simply means a message has a key associated with it. What more interesting is Kafka will send all messages of the same key to the same partition. This is similar to a distributed hash table, where a hash function turns a key into a partition address and it appends values to the correct partition.
Going back to the user email address example, we can use user ID as message key and this will ensure Kafka will route changes to the email address by the same users to the same partition. Consumer will therefore receive them in the correct order
A less elegant but simpler solution is to keep track of the timestamp of the last processed message. Upon an arrival of a new message, check if its timestamp is before the last saved timestamp and if so, discard it. Please note that this solution is susceptible to race condition unless the check and the processing step are atomic
Second Kafka gotchas: At least once
The second kafka gotchas is not really a gotchas per say. It’s just a confusing concept and I try my best to explain it here
When I first learned about Kafka, I was very confused by the term “at least once” being thrown around a whole lot. What this refers to is message delivery schematic. There are 3 types
At most once: Consumer will process messages at most once and the same message will not be processed again
At least once: Consumer will process messages at least once and the same message can be proccessed multiple time
Exactly once: Consumer will process messages one and only one time. This is the strongest delivery schematic and is what we usually want
Recap on how consumers remember its offset
We learned from the previous post that Kafka expects its consumers to maintain the offsets on their own. Periodically, consumers will ask Kafka for new messages after its offset. It’s up to the consumer to increment the offset after it has processed those messages. Kafka versions after 0.8.1.1 allow consumers to store their offset in Kafka itself in a highly replicated and available topic. However, this is just for convenience and consumers still have to insert their offsets into this special topic.
Deciding which schematic you want
When reasoning about message delivery schematic, you have to assume the worst case scenario where consumers can crash suddenly. Come up with all possible scenarios and find the weakest guarantee and code your consumer to handle that schematic.
Consumers then have a few choices in when to commit the offset in relative to message processing
Increase the offset and commit it before processing the message
Following diagram shows a normal timeline
On the left is our consumer and on the right is our broker. Here we have our consumer starting at index 0, fetches messages from that index from the broker. Once it receives the message and before processing it, the consumer will commit it increased offset of 1 to Kafka (or its database which will require a 3rd column). After the commit, the consumer process message and repeat the processing fetching from 1
However, things aren’t always as rosy. Remember, our consumer can crash at any step. Here is an example of that scenario
Here we have our server crashes right after it receives the message before it has the chance to commit its offset to Kafka. Once it comes back up, it’d start polling from it current offset which is still at 0. No message is lost, no message is processed twice. Everything is still right with the world.
In another case, the consumer can die after it has committed the offset but before processing it.
When it comes back up, it will ask Kafka for it current offset which is now 1. It will merrily start processing message 1, completely oblivious to the poor message 0.
As we can see in this case, the delivery schematic is at most once. Under no circumstance will the consumer process a message twice but it can skip over some messages
Process message and increase the offset and finally commit.
The only difference is the offset commit step is now after message processing step. We’ll see how this is equivalent to at least once schematic
The following diagram illustrates a crash after the consumer has received the message but before it processes or commits the offset
When the consumer comes back up, it will ask Kafka for its offset which is still at 0. It then proceed to fetch the original message and process it. This is another case where a crash doesn’t lead to any problem other than delayed processing
However, if the crash is after the consumer has processed the message, things are a lot different now
Because the consumer hasn’t committed its offset yet, when it comes back up, it’ll receive the old offset and therefore the old message. This causes the consumer to process message 0 twice. This means we have at least once delivery in this situation
Process message and increase offset in one atomic transaction
I have not personally used this approach because it requires a custom offset storing solution other than the built in Kafka-backed offset storage. However it can be clear why this approach will achieve exactly once schematic.
In short, to get exactly once schematic, not only do you have to roll your own offset storage solution, but you also need to wrap that operation in the same transaction with the main processing so they either fail or succeed together. What if you’re writing data into a system that do not support atomic operations? Needless to say, this is rather constraining. What I found work best for me is a lot easier
Solution to duplicated messages
My solution to this kafka gotchas is not to implement exactly once delivery but to code my consumer such that it can safely process duplicated messages again without side effect. This has a very big sounding name: idempotent and there are multiple implementations. In my use case of keeping two database systems in sync, I find upsert to be the simplest and most elegant
Upsert is a special SQL that will insert the row into the database. However, if the insert query fails due to a unique constraint, it will update the existing row instead. Following is an example of upsert query in PostgreSQL
As you can see, if the row with the same product_id and location_id already exists, the above query will update its price with new data. So if we receive the same message twice; first message will create a new row. In the second time, it’d update the row to the same value, which leave the system in the same state as if we only process the message once
I have included the two kafka gotchas which cost me the most time when I first using Kafka. Given how we have only started using Kafka in production last month, this post can very well be 10 gotchas 2 months from now. In next post, I’ll discuss share our python code for producing and consuming Kafka topic with support for schema. How about you? What kind of stumbling blocks have you come across working with Kafka?