Kafka gotchas, out of order, delivery schematic … oh my!

In my first post, I’ve shown how simple Kafka is. Nevertheless, to be fast, durable and scalable, Kafka has made some design choices whose implications can be difficult for beginners to understand. This post aims to overview some kafka gotchas that I have come across and my approach in solving them

First kafka gotchas: Out of order

The first Kafka gotchas that almost everyone run into is out of order in messages delivery

In order to scale a topic out, it is necessary to divide it into multiple partitions. This is similar to database sharding. This creates some challenges especially in consuming messages from a Kafka topic.

Because we can only append to a partition of a kafka topic, Kafka guarantee that consumers will receive messages in order from the same partition. However, if a topic has multiple partitions, we can receive messages out of order.

kafka gotchas Producer writing to multiple partitions

Here a producer is writing to a topic with 2 partitions. The first message is A and the second is B. Because by default, producer writes to partitions in round robin fashion, partition 1 will receive A and partition 2 will receiv B

kafka_gotchas_consumer-pulls-data-from-partitions

For consumers, Kafka uses pulling mechanism whereas consumers ask partitions for new data. Here, the offset the consumer have for both partitions are 0, so its ask both partitions for data starting at 0 offset

kafka gotchas partitions-return-out-of-order-data

Thing can go awry if server hosting partition 1 is underload or just happen to receive the pulling message slightly after partition 2. In this case, our consumer will receive B from partition 2 first and A shortly after.

Solution for out of order delivery

Usually, we are not interested in the total ordering of messages, but only in the ordering of a subsets of them. For example, if the topic is update to user email and we want update Mailchimp with those new email, we are more concerned that email change of the same users get to Mailchimp in the correct order. We wouldn’t mind if email address of user B gets to Mailchimp before user A even though B did change his email address after A. How can we enforce such ordering?

Introducing keyed message. Up to this point, we only think of a message as an arbitrary piece of data. However, Kafka supports another type of message called keyed message. This simply means a message has a key associated with it. What more interesting is Kafka will send all messages of the same key to the same partition. This is similar to a distributed hash table, where a hash function turns a key into a partition address and it appends values to the correct partition.

Going back to the user email address example, we can use user ID as message key and this will ensure Kafka will route changes to the email address by the same users to the same partition. Consumer will therefore receive them in the correct order

A less elegant but simpler solution is to keep track of the timestamp of the last processed message. Upon an arrival of a new message, check if its timestamp is before the last saved timestamp and if so, discard it. Please note that this solution is susceptible to race condition unless the check and the processing step are atomic

Second Kafka gotchas: At least once

The second kafka gotchas is not really a gotchas per say. It’s just a confusing concept and I try my best to explain it here

When I first learned about Kafka, I was very confused by the term “at least once” being thrown around a whole lot. What this refers to is message delivery schematic. There are 3 types

  1. At most once: Consumer will process messages at most once and the same message will not be processed again
  2. At least once: Consumer will process messages at least once and the same message can be proccessed multiple time
  3. Exactly once: Consumer will process messages one and only one time. This is the strongest delivery schematic and is what we usually want

Recap on how consumers remember its offset

We learned from the previous post that Kafka expects its consumers to maintain the offsets on their own. Periodically, consumers will ask Kafka for new messages after its offset. It’s up to the consumer to increment the offset after it has processed those messages. Kafka versions after 0.8.1.1  allow consumers to store their offset in Kafka itself in a highly replicated and available topic. However, this is just for convenience and consumers still have to insert their offsets into this special topic.

Deciding which schematic you want

When reasoning about message delivery schematic, you have to assume the worst case scenario where consumers can crash suddenly. Come up with all possible scenarios and find the weakest guarantee and code your consumer to handle that schematic.

Consumers then have a few choices in when to commit the offset in relative to message processing

Increase the offset and commit it before processing the message

Following diagram shows a normal timeline
kafka_gotchas_early_commit_normal
On the left is our consumer and on the right is our broker. Here we have our consumer starting at index 0, fetches messages from that index from the broker. Once it receives the message and before processing it, the consumer will commit it increased offset of 1 to Kafka (or its database which will require a 3rd column). After the commit, the consumer process message and repeat the processing fetching from 1

However, things aren’t always as rosy. Remember, our consumer can crash at any step. Here is an example of that scenario

kafka_gotchas_early_commit_crash_before_commit
Here we have our server crashes right after it receives the message before it has the chance to commit its offset to Kafka. Once it comes back up, it’d start polling from it current offset which is still at 0. No message is lost, no message is processed twice. Everything is still right with the world.

In another case, the consumer can die after it has committed the offset but before processing it.

kafka_gotchas_early_commit_crash_after_commit

When it comes back up, it will ask Kafka for it current offset which is now 1. It will merrily start processing message 1, completely oblivious to the poor message 0.

As we can see in this case, the delivery schematic is at most once. Under no circumstance will the consumer process a message twice but it can skip over some messages

Process message and increase the offset and finally commit.

The only difference is the offset commit step is now after message processing step. We’ll see how this is equivalent to at least once schematic

The following diagram illustrates a crash after the consumer has received the message but before it processes or commits the offset

kafka_gotchas_late_commit_crash_before_processing

When the consumer comes back up, it will ask Kafka for its offset which is still at 0. It then proceed to fetch the original message and process it. This is another case where a crash doesn’t lead to any problem other than delayed processing

However, if the crash is after the consumer has processed the message, things are a lot different now

kafka_gotchas_late_commit_crash_after_processing

Because the consumer hasn’t committed its offset yet, when it comes back up, it’ll receive the old offset and therefore the old message. This causes the consumer to process message 0 twice. This means we have at least once delivery in this situation

Process message and increase offset in one atomic transaction

I have not personally used this approach because it requires a custom offset storing solution other than the built in Kafka-backed offset storage. However it can be clear why this approach will achieve exactly once schematic.

In short, to get exactly once schematic, not only do you have to roll your own offset storage solution, but you also need to wrap that operation in the same transaction with the main processing so they either fail or succeed together. What if you’re writing data into a system that do not support atomic operations? Needless to say, this is rather constraining. What I found work best for me is a lot easier

Solution to duplicated messages

My solution to this kafka gotchas is not to implement exactly once delivery but to code my consumer such that it can safely process duplicated messages again without side effect. This has a very big sounding name: idempotent and there are multiple implementations. In my use case of keeping two database systems in sync, I find  upsert to be the simplest and most elegant

Upsert is a special SQL that will insert the row into the database. However, if the insert query fails due to a unique constraint, it will update the existing row instead. Following is an example of upsert query in PostgreSQL

As you can see, if the row with the same product_id and location_id already exists, the above query will update its price with new data. So if we receive the same message twice; first message will create a new row. In the second time, it’d update the row to the same value, which leave the system in the same state as if we only process the message once

Conclusion

I have included the two kafka gotchas which cost me the most time when I first using Kafka. Given how we have only started using Kafka in production last month, this post can very well be 10 gotchas 2 months from now. In next post, I’ll discuss share our python code for producing and consuming Kafka topic with support for schema. How about you? What kind of stumbling blocks have you come across working with Kafka?

1 reply
  1. Jeff
    Jeff says:

    Hey, thanks for the very good post, which explains some important concepts and pitfalls in Kafka. I would add that it might be complicated to realize an idempotent consumer in other scenarios (if the data is not simply written into a database), but after all, this might still be the best approach. Still, one has to decide individually what delivery guarantee is required and what handling fits best.

    Reply

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.