You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
We are currently facing an issue with our message processing system. We have six partitions and four middleware instances acting as consumers, all sharing the same consumer group ID. While messages in partitions 1, 2, 3, and 4 are being consumed and processed correctly, those in partitions 0 and 5 are not being processed at all.
Could anyone provide insights into why this might be happening? This issue is causing us to miss notifications for our app. To Reproduce
kafkajs": "1.15.0
Observed behavior
Consumers are not processing messages from partitions 0 and 5.
Environment:
OS: [all]
KafkaJS version 1.15.0
NodeJS version [^14.14.25]
Additional context
below code to create consumer try { this.consumer = this.kafka.consumer({ groupId: this.consumerGroupId, // the messages should be processed by exaclty one replica, therefore we set the consumer group. sessionTimeout: 45000, // deafult is :30000=>This has to be increased in case there is delay in processing message , heartbeat is sent to broker heartbeatInterval: 3000 }) await this.consumer.connect() await this.consumer.subscribe({ topic: this.topic, fromBeginning: true }) // We want to process old messages, so that each message is processed "at-least-once". await this.consumer.run({ autoCommit: false, // we want offsets to be committed to Kafka automatically // autoCommitInterval: 500, // we want offsets to be committed not faster than every 500ms because Azure EventHubs has a limitation of 4 offset-commits per second per partition. eachMessage: async ({ topic, partition, message }) => { await this.sendMessageToHandlers(topic, partition, message, this.handlers) // get the object and test const partitionDetails = this.mapOfLastOffsetCommitTimes.get(partition) || {} as PartitionOffsetDetails partitionDetails.offset = message.offset // this.mapOfLastOffsetCommitTimes.set(partition, { offset: message.offset, time: Date.now(), lastOffset: t }) this.mapOfLastOffsetCommitTimes.set(partition, partitionDetails) }, })
The text was updated successfully, but these errors were encountered:
SantoshAvaji
changed the title
Few partition messages are not processed,
Issue with Message Processing or consumption in few Partitions
Aug 2, 2024
Describe the bug
We are currently facing an issue with our message processing system. We have six partitions and four middleware instances acting as consumers, all sharing the same consumer group ID. While messages in partitions 1, 2, 3, and 4 are being consumed and processed correctly, those in partitions 0 and 5 are not being processed at all.
Could anyone provide insights into why this might be happening? This issue is causing us to miss notifications for our app.
To Reproduce
kafkajs": "1.15.0
Observed behavior
Consumers are not processing messages from partitions 0 and 5.
Environment:
Additional context
below code to create consumer
try { this.consumer = this.kafka.consumer({ groupId: this.consumerGroupId, // the messages should be processed by exaclty one replica, therefore we set the consumer group. sessionTimeout: 45000, // deafult is :30000=>This has to be increased in case there is delay in processing message , heartbeat is sent to broker heartbeatInterval: 3000 }) await this.consumer.connect() await this.consumer.subscribe({ topic: this.topic, fromBeginning: true }) // We want to process old messages, so that each message is processed "at-least-once". await this.consumer.run({ autoCommit: false, // we want offsets to be committed to Kafka automatically // autoCommitInterval: 500, // we want offsets to be committed not faster than every 500ms because Azure EventHubs has a limitation of 4 offset-commits per second per partition. eachMessage: async ({ topic, partition, message }) => { await this.sendMessageToHandlers(topic, partition, message, this.handlers) // get the object and test const partitionDetails = this.mapOfLastOffsetCommitTimes.get(partition) || {} as PartitionOffsetDetails partitionDetails.offset = message.offset // this.mapOfLastOffsetCommitTimes.set(partition, { offset: message.offset, time: Date.now(), lastOffset: t }) this.mapOfLastOffsetCommitTimes.set(partition, partitionDetails) }, })
The text was updated successfully, but these errors were encountered: