Skip to content

Consumer silently stops consuming — race conditions in #nonEmpty and #fetchInProgress signals #476

@ndrantotiana

Description

@ndrantotiana

Description

The KafkaJS compat consumer silently stops calling eachBatch/eachMessage after a period of inactivity. The consumer stays connected (heartbeats continue, partitions assigned), but the internal worker loop never fetches messages again. Only a full disconnect() + new consumer instance recovers.

Reproduction

  1. Create a consumer with eachBatch handler and autoCommit: false
  2. Produce a burst of messages, let the consumer process them
  3. Wait for max.poll.interval.ms (default 300s) with no new messages
  4. Produce a new message during the cache expiration window
  5. The consumer never calls eachBatch for the new message

This is intermittent — it depends on the exact timing of the message arrival relative to the cache expiration cycle.

Environment

  • @confluentinc/kafka-javascript: 1.8.2
  • Node.js: 18
  • OS: Linux (Kubernetes pods)
  • Kafka: Amazon MSK

Root cause analysis

The issue is a race condition between #cacheExpirationLoop and #queueNonEmptyCb in lib/kafkajs/_consumer.js.

Normal flow (working)

  1. Workers are running, waiting on await this.#nonEmpty in #fetchAndResolveWith (line 1054)
  2. librdkafka C thread receives a message, calls #queueNonEmptyCb
  3. #queueNonEmptyCb#notifyNonEmpty() → resolves #nonEmpty, sets it to null
  4. Worker wakes up, calls consume(), gets the message, processes it

Stall flow (broken)

  1. No messages for max.poll.interval.ms (= #cacheExpirationTimeoutMs, line 709)
  2. #cacheExpirationLoop detects timeout (line 1581: now > cacheExpirationTimeout)
  3. Calls #addPendingOperation(() => #clearCacheAndResetPositions()) (line 1582)
  4. #addPendingOperation#resolveWorkerTerminationScheduled() (line 1781) — terminates all workers
  5. #resolveWorkerTerminationScheduled also calls #queueNonEmptyCb() (line 1624) — resolves #nonEmpty, sets it to null
  6. Workers exit their while loop (line 1517: !this.#workerTerminationScheduled.resolved)
  7. Meanwhile: a new message arrives, librdkafka calls #queueNonEmptyCb#notifyNonEmpty()#nonEmpty is already null (from step 5) → no-op, signal lost
  8. #runInternal continues: await Promise.allSettled(workers)await cacheExpirationLoop#executePendingOperations#clearCacheAndResetPositions (seeks back to last consumed offset)
  9. New workers spawned
  10. Worker calls #consumeCachedN → cache empty → #fetchAndResolveWith
  11. #nonEmpty is null → proceeds to consume()
  12. consume() returns the message (seek reset position, librdkafka re-fetches) — this CAN work
  13. But if consume() returns 0 (race with librdkafka's internal re-fetch): #messageCache.assignedSize === 0#nonEmpty = new DeferredPromise() (line 1096)
  14. Worker waits on #nonEmpty forever — the #queueNonEmptyCb signal was already spent in step 7
  15. Consumer appears alive (heartbeats via librdkafka C thread) but never processes messages again

Key observation

#nonEmpty is a one-shot signal. Once resolved, it's set to null. If a new message arrives while #nonEmpty is null (between steps 5 and 13), the signal is lost. The worker creates a new DeferredPromise in step 13 but #queueNonEmptyCb has already fired — it won't fire again until ANOTHER new message arrives.

Evidence

We observed this across 19 microservices using the KafkaJS compat layer:

  • Consumer processes messages successfully for 20+ minutes
  • eachBatch stops being called (confirmed via periodic heartbeat logging: callsSinceLast=0, growing silenceMs)
  • Consumer stays connected (librdkafka heartbeats, partitions assigned)
  • No errors logged by either our code or the consumer
  • Kafka lag grows (messages produced but never consumed)
  • Only creating a brand new consumer instance recovers consumption

The stall consistently occurs after a quiet period matching max.poll.interval.ms.

Proposed fix

Add a timeout to await this.#nonEmpty in #fetchAndResolveWith (line 1054):

// BEFORE (line 1053-1058):
if (this.#nonEmpty) {
    await this.#nonEmpty;
    return null;
}

// AFTER:
if (this.#nonEmpty) {
    await Timer.withTimeout(1000, this.#nonEmpty);  // 1s fallback, arbitrary
    return null;
}

This ensures that even if the #nonEmpty signal is lost during the cache expiration race, the worker retries the fetch after 10 seconds instead of waiting forever. Timer.withTimeout is already used throughout the codebase (lines 1553, 1600).

The 10-second delay is negligible compared to the current "stall forever" behavior, and consume() will return 0 if there are truly no messages (the worker loops back and waits again).

Workaround

We implemented a stall detector that creates a brand new consumer instance (disconnect old → create new → connect → subscribe → run) after 10 minutes of silence. This works because disconnect() sets #disconnectStarted which stops the #runInternal loop, and a new consumer has fresh state.

Note: reconnecting the SAME consumer after disconnect() fails because #running is set to true in run() (line 1305) but never reset to false in disconnect() (line 2132 sets #disconnectStarted but not #running=false). This is a separate issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions