Reset Kafka Offset on Error (Python)
I’m using confluent-kafka
for a few projects that need to pass events back and forth. At first I was using a consumer loop similar to the one documented in Confluent’s “Basic Poll Loop” example.
However, this loop has a subtle problem. I manage consumer offsets myself to make sure that, in the worst case, a message is processed multiple times instead of being partially processed once — in other words, to ensure at-least-once delivery. If anything goes wrong while polling or processing the message, an exception is triggered and the loop starts again.
The offset stays correct in Kafka, but much to my own surprise, poll()
does not read from the last committed offset — instead, the consumer has its own in-memory offset value that updates on every message. If a message fails to process, the loop correctly throws an exception and avoids committing the new offset back, but poll()
will give me the next message in the queue on the following loop!
I don’t want this behavior. I’d rather the consumer loop endlessly on a bad message to let a downstream service recover or so that I can manually intervene. In my case, being stuck is preferable to processing messages out of order.
Here’s an updated consumer loop example with my preferred behavior.
When an exception is caught, the consumer reads its committed offset from Kafka, then seeks back to it.
This does have the disadvantage of bombing out if there’s a subsequent exception when seeking to the offset, but that indicates something wrong with the service or cluster rather than the message processing. At that point I’d rather have Kubernetes restart the container and begin again with a fresh consumer anyway.