Skip to content

Commit

Permalink
storeMessageOffset: ignore state error (#129)
Browse files Browse the repository at this point in the history
Motivation:

Previously, we failed the entire `KafkaConsumer` if storing
a message offset through `RDKafkaClient.storeMessageOffset`
failed because the partition the offset should be committed to
was unassigned (which can happen during rebalance).

We should not fail the consumer when committing during
rebalance.

The worst thing that could happen here is that storing the offset
fails and we re-read a message, which is fine since KafkaConsumers with
automatic commits are designed for at-least-once processing:

https://docs.confluent.io/platform/current/clients/consumer.html#offset-management

Modifications:

* `RDKafkaClient.storeMessageOffset`: don't throw when receiving
  error
  `RD_KAFKA_RESP_ERR__STATE`
  • Loading branch information
felixschlegel authored Sep 4, 2023
1 parent 8592c61 commit c85a410
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,14 @@ final class RDKafkaClient: Sendable {
}

if error != RD_KAFKA_RESP_ERR_NO_ERROR {
// Ignore RD_KAFKA_RESP_ERR__STATE error.
// RD_KAFKA_RESP_ERR__STATE indicates an attempt to commit to an unassigned partition,
// which can occur during rebalancing or when the consumer is shutting down.
// See "Upgrade considerations" for more details: https://github.com/confluentinc/librdkafka/releases/tag/v1.9.0
// Since Kafka Consumers are designed for at-least-once processing, failing to commit here is acceptable.
if error != RD_KAFKA_RESP_ERR__STATE {
return
}
throw KafkaError.rdKafkaError(wrapping: error)
}
}
Expand Down

0 comments on commit c85a410

Please sign in to comment.