Description
The KafkaJS compat consumer silently stops calling eachBatch/eachMessage after a period of inactivity. The consumer stays connected (heartbeats continue, partitions assigned), but the internal worker loop never fetches messages again. Only a full disconnect() + new consumer instance recovers.
Reproduction
- Create a consumer with
eachBatch handler and autoCommit: false
- Produce a burst of messages, let the consumer process them
- Wait for
max.poll.interval.ms (default 300s) with no new messages
- Produce a new message during the cache expiration window
- The consumer never calls
eachBatch for the new message
This is intermittent — it depends on the exact timing of the message arrival relative to the cache expiration cycle.
Environment
@confluentinc/kafka-javascript: 1.8.2
- Node.js: 18
- OS: Linux (Kubernetes pods)
- Kafka: Amazon MSK
Root cause analysis
The issue is a race condition between #cacheExpirationLoop and #queueNonEmptyCb in lib/kafkajs/_consumer.js.
Normal flow (working)
- Workers are running, waiting on
await this.#nonEmpty in #fetchAndResolveWith (line 1054)
- librdkafka C thread receives a message, calls
#queueNonEmptyCb
#queueNonEmptyCb → #notifyNonEmpty() → resolves #nonEmpty, sets it to null
- Worker wakes up, calls
consume(), gets the message, processes it
Stall flow (broken)
- No messages for
max.poll.interval.ms (= #cacheExpirationTimeoutMs, line 709)
#cacheExpirationLoop detects timeout (line 1581: now > cacheExpirationTimeout)
- Calls
#addPendingOperation(() => #clearCacheAndResetPositions()) (line 1582)
#addPendingOperation → #resolveWorkerTerminationScheduled() (line 1781) — terminates all workers
#resolveWorkerTerminationScheduled also calls #queueNonEmptyCb() (line 1624) — resolves #nonEmpty, sets it to null
- Workers exit their
while loop (line 1517: !this.#workerTerminationScheduled.resolved)
- Meanwhile: a new message arrives, librdkafka calls
#queueNonEmptyCb → #notifyNonEmpty() → #nonEmpty is already null (from step 5) → no-op, signal lost
#runInternal continues: await Promise.allSettled(workers) → await cacheExpirationLoop → #executePendingOperations → #clearCacheAndResetPositions (seeks back to last consumed offset)
- New workers spawned
- Worker calls
#consumeCachedN → cache empty → #fetchAndResolveWith
#nonEmpty is null → proceeds to consume()
consume() returns the message (seek reset position, librdkafka re-fetches) — this CAN work
- But if
consume() returns 0 (race with librdkafka's internal re-fetch): #messageCache.assignedSize === 0 → #nonEmpty = new DeferredPromise() (line 1096)
- Worker waits on
#nonEmpty forever — the #queueNonEmptyCb signal was already spent in step 7
- Consumer appears alive (heartbeats via librdkafka C thread) but never processes messages again
Key observation
#nonEmpty is a one-shot signal. Once resolved, it's set to null. If a new message arrives while #nonEmpty is null (between steps 5 and 13), the signal is lost. The worker creates a new DeferredPromise in step 13 but #queueNonEmptyCb has already fired — it won't fire again until ANOTHER new message arrives.
Evidence
We observed this across 19 microservices using the KafkaJS compat layer:
- Consumer processes messages successfully for 20+ minutes
eachBatch stops being called (confirmed via periodic heartbeat logging: callsSinceLast=0, growing silenceMs)
- Consumer stays connected (librdkafka heartbeats, partitions assigned)
- No errors logged by either our code or the consumer
- Kafka lag grows (messages produced but never consumed)
- Only creating a brand new consumer instance recovers consumption
The stall consistently occurs after a quiet period matching max.poll.interval.ms.
Proposed fix
Add a timeout to await this.#nonEmpty in #fetchAndResolveWith (line 1054):
// BEFORE (line 1053-1058):
if (this.#nonEmpty) {
await this.#nonEmpty;
return null;
}
// AFTER:
if (this.#nonEmpty) {
await Timer.withTimeout(1000, this.#nonEmpty); // 1s fallback, arbitrary
return null;
}
This ensures that even if the #nonEmpty signal is lost during the cache expiration race, the worker retries the fetch after 10 seconds instead of waiting forever. Timer.withTimeout is already used throughout the codebase (lines 1553, 1600).
The 10-second delay is negligible compared to the current "stall forever" behavior, and consume() will return 0 if there are truly no messages (the worker loops back and waits again).
Workaround
We implemented a stall detector that creates a brand new consumer instance (disconnect old → create new → connect → subscribe → run) after 10 minutes of silence. This works because disconnect() sets #disconnectStarted which stops the #runInternal loop, and a new consumer has fresh state.
Note: reconnecting the SAME consumer after disconnect() fails because #running is set to true in run() (line 1305) but never reset to false in disconnect() (line 2132 sets #disconnectStarted but not #running=false). This is a separate issue.
Description
The KafkaJS compat consumer silently stops calling
eachBatch/eachMessageafter a period of inactivity. The consumer stays connected (heartbeats continue, partitions assigned), but the internal worker loop never fetches messages again. Only a fulldisconnect()+ new consumer instance recovers.Reproduction
eachBatchhandler andautoCommit: falsemax.poll.interval.ms(default 300s) with no new messageseachBatchfor the new messageThis is intermittent — it depends on the exact timing of the message arrival relative to the cache expiration cycle.
Environment
@confluentinc/kafka-javascript: 1.8.2Root cause analysis
The issue is a race condition between
#cacheExpirationLoopand#queueNonEmptyCbinlib/kafkajs/_consumer.js.Normal flow (working)
await this.#nonEmptyin#fetchAndResolveWith(line 1054)#queueNonEmptyCb#queueNonEmptyCb→#notifyNonEmpty()→ resolves#nonEmpty, sets it tonullconsume(), gets the message, processes itStall flow (broken)
max.poll.interval.ms(=#cacheExpirationTimeoutMs, line 709)#cacheExpirationLoopdetects timeout (line 1581:now > cacheExpirationTimeout)#addPendingOperation(() => #clearCacheAndResetPositions())(line 1582)#addPendingOperation→#resolveWorkerTerminationScheduled()(line 1781) — terminates all workers#resolveWorkerTerminationScheduledalso calls#queueNonEmptyCb()(line 1624) — resolves#nonEmpty, sets it tonullwhileloop (line 1517:!this.#workerTerminationScheduled.resolved)#queueNonEmptyCb→#notifyNonEmpty()→#nonEmptyis alreadynull(from step 5) → no-op, signal lost#runInternalcontinues:await Promise.allSettled(workers)→await cacheExpirationLoop→#executePendingOperations→#clearCacheAndResetPositions(seeks back to last consumed offset)#consumeCachedN→ cache empty →#fetchAndResolveWith#nonEmptyisnull→ proceeds toconsume()consume()returns the message (seek reset position, librdkafka re-fetches) — this CAN workconsume()returns 0 (race with librdkafka's internal re-fetch):#messageCache.assignedSize === 0→#nonEmpty = new DeferredPromise()(line 1096)#nonEmptyforever — the#queueNonEmptyCbsignal was already spent in step 7Key observation
#nonEmptyis a one-shot signal. Once resolved, it's set tonull. If a new message arrives while#nonEmptyisnull(between steps 5 and 13), the signal is lost. The worker creates a newDeferredPromisein step 13 but#queueNonEmptyCbhas already fired — it won't fire again until ANOTHER new message arrives.Evidence
We observed this across 19 microservices using the KafkaJS compat layer:
eachBatchstops being called (confirmed via periodic heartbeat logging:callsSinceLast=0, growingsilenceMs)The stall consistently occurs after a quiet period matching
max.poll.interval.ms.Proposed fix
Add a timeout to
await this.#nonEmptyin#fetchAndResolveWith(line 1054):This ensures that even if the
#nonEmptysignal is lost during the cache expiration race, the worker retries the fetch after 10 seconds instead of waiting forever.Timer.withTimeoutis already used throughout the codebase (lines 1553, 1600).The 10-second delay is negligible compared to the current "stall forever" behavior, and
consume()will return 0 if there are truly no messages (the worker loops back and waits again).Workaround
We implemented a stall detector that creates a brand new consumer instance (disconnect old → create new → connect → subscribe → run) after 10 minutes of silence. This works because
disconnect()sets#disconnectStartedwhich stops the#runInternalloop, and a new consumer has fresh state.Note: reconnecting the SAME consumer after
disconnect()fails because#runningis set totrueinrun()(line 1305) but never reset tofalseindisconnect()(line 2132 sets#disconnectStartedbut not#running=false). This is a separate issue.