This repository has been archived by the owner on Jan 30, 2023. It is now read-only.
Handle the case where the coordinator is replaced with a new host #2
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
If the group coordinator is replaced with a different host, but the broker id remains the same, the client will go into and endless reconnection loop. This PR refreshes the cluster data if there is a ConnectionError when joining a group. The issue is reproducible by following these steps:
each_message
loop.@coordinator
inConsumerGroup
.each_message
loop but do not exit the process.each_message
loop again.When the above steps are taken:
ConsumerGroup#join
is called.coordinator.join_group
on ConsumerGroup L:117 fails withConnectionError
.ConsumerGroup#join
sets@coordinator = nil
.Cluster#get_group_coordinator
asks a broker for the broker id of the coordinator which is 0.connect_to_broker
pulls cached info for id 0 (i.e. the old IP).coordinator.join_group
on ConsumerGroup L:117 fails withConnectionError
restarting the loop.Seeing as the retry for a ConnectionError is guarded by a
sleep 1
I'm hoping this is a pretty safe place to refresh metadata.