I’m using confluent-kafka for a few projects that need to pass events back and forth. At first I was using a consumer loop similar to the one documented in Confluent’s “Basic Poll Loop” example.

while True:
    try:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        process_msg(msg)
        
        # My consumers always have enable.auto.commit set to False.
        # This lets me manage offsets myself and ensure that all
        # messages are fully processed at least once.
        consumer.commit()
    except Exception:
        logger.exception("Error while processing messages.")
        time.sleep(1)
        
consumer.close()
Careful! This may consume messages out of order.

However, this loop has a subtle problem. I manage consumer offsets myself to make sure that, in the worst case, a message is processed multiple times instead of being partially processed once — in other words, to ensure at-least-once delivery. If anything goes wrong while polling or processing the message, an exception is triggered and the loop starts again.

The offset stays correct in Kafka, but much to my own surprise, poll() does not read from the last committed offset — instead, the consumer has its own in-memory offset value that updates on every message. If a message fails to process, the loop correctly throws an exception and avoids committing the new offset back, but poll() will give me the next message in the queue on the following loop!

I don’t want this behavior. I’d rather the consumer loop endlessly on a bad message to let a downstream service recover or so that I can manually intervene. In my case, being stuck is preferable to processing messages out of order.

Here’s an updated consumer loop example with my preferred behavior.

while True:
    msg = None
    try:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        process_msg(msg)
        consumer.commit()
    except Exception:
        logger.exception("Error while processing messages.")
        if msg:
            partition = TopicPartition(msg.topic(), msg.partition())
            partition = consumer.committed([partition])[0]
            consumer.seek(partition)
        time.sleep(1)
        
consumer.close()
This loop would sooner wait forever than process messages out of order.

When an exception is caught, the consumer reads its committed offset from Kafka, then seeks back to it.

This does have the disadvantage of bombing out if there’s a subsequent exception when seeking to the offset, but that indicates something wrong with the service or cluster rather than the message processing. At that point I’d rather have Kubernetes restart the container and begin again with a fresh consumer anyway.