Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer is picking messages from single partition after adding 3 hours of delayed in consuming. #1045

Open
sanketdhoble opened this issue Oct 12, 2023 · 1 comment

Comments

@sanketdhoble
Copy link

sanketdhoble commented Oct 12, 2023

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: Mac OS 13
  • Node Version [e.g. 8.2.1]: 12.14.1
  • NPM Version [e.g. 5.4.2]:6.13.4
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]:
  • node-rdkafka version [e.g. 2.3.3]: 2.7.4
this.consumer = <any>new Kafka.KafkaConsumer(
    {
        'group.id': consumerConfig.defaultGroupId + "_" + topics.join("_"),
        'client.id': consumerConfig.defaultClientId + "_" + topics.join("_"),
        'metadata.broker.list': consumerConfig.brokerList,
        'compression.codec': consumerConfig.compressionCodec,
        'retry.backoff.ms': 100,
        'socket.keepalive.enable': true,
        'enable.auto.commit': this.autoCommit,
        'statistics.interval.ms': 5000,
        // 'queued.min.messages': 10,
        // 'fetch.message.max.bytes': 30,
        // 'queued.max.messages.kbytes': 10 // didn't work
        debug: 'all'
    },
    {
        'auto.offset.reset': consumerConfig.autoOffsetReset,
        'request.required.acks': consumerConfig.requestRequiredAcks
    }
);
  

Topic: profile_updates
Partitions: 10

We are trying to add a mechanism where we produce msgs at 5-6 qps but we want to consume those msgs after 3 hours of delay since produce.
We consume in a batch of 90 messages. Each msg of ~150 bytes.
In the first batch, we checked the timestamp and add delay of
3 hrs - (Date.now()- msg_produce_timestamp) i.e slept for ~3 hours.
We are using manual commit (autoCommit: false).

Now the issue with this workflow is, after waking up from sleep, the consumer is picking all 90 msgs from partition:0 only.
As a result, lag keeps increasing in the other 9 partitions.
Is there any way/configuration to optimize this flow so that we could consume from each partition equally and hence the lag will be the same in all partitions?

@sanketdhoble
Copy link
Author

As a work around we played with

'queued.min.messages': 9,
'fetch.message.max.bytes': 1800,

But still 3-4 partitions are running slow compared to the rest.

OPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
profile_updates                0          42169083        42199150        30067      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                1          42245275        42249372        4097       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                2          41997395        42005654        8259       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                3          41627552        41657073        29521      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                4          42614473        42618613        4140       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                5          41785716        41793027        7311       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                6          42351924        42381599        29675      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                7          41933727        41937595        3868       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                8          41898170        41906114        7944       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                9          41898250        41927516        29266      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant