Description
Cross-topic blocking reported in #247 is still present in v1.8.2. When a single consumer group subscribes to multiple topics, slow message processing on one topic completely blocks consumption from other topics.
Benchmark Results (v1.8.2, Kafka 3)
| Metric |
Value |
| topic-fast completion (from publish) |
95.32s |
| topic-slow processed (at fast complete) |
1000/1000 |
| topic-slow progress |
100.0% |
| Blocked? |
YES - cross-topic blocking detected |
| partitionsConsumedConcurrently |
2 |
Environment: confluent-kafka-javascript v1.8.2, Kafka 3, Node.js
Reproduction Scenario
- Subscribe to 2 topics with a single consumer group:
topic-slow (1 partition), topic-fast (1 partition)
- Publish 1000 messages to
topic-slow, start consuming with 100ms/msg handler delay
- After 50 slow messages processed, publish 10 messages to
topic-fast with instant handler
- Measure time for
topic-fast to complete all 10 messages
Expected: topic-fast completes in <1s (independent consumption with partitionsConsumedConcurrently: 2)
Actual: topic-fast took 95.32s, waiting for all 1000 slow messages to finish first
Root Cause Analysis
The blocking originates in lib/kafkajs/_consumer.js, specifically in #consumeSingleCached and #consumeCachedN:
// lib/kafkajs/_consumer.js — #consumeSingleCached
async #consumeSingleCached(ppc) {
const msg = this.#messageCache.next(ppc);
if (msg) {
return msg;
}
// THIS CONDITION CAUSES CROSS-TOPIC BLOCKING:
if (!msg && this.#messageCache.assignedSize !== 0) {
await this.#messageCache.availablePartitions();
return null;
}
return this.#fetchAndResolveWith(() => this.#messageCache.next(),
this.#messageCacheMaxSize);
}
Why it blocks: When partitionsConsumedConcurrently >= 2, each worker gets a PerPartitionCache (PPC). If WorkerA is processing topic-slow (holding its PPC), WorkerB finishes topic-fast quickly but then:
this.#messageCache.next(ppc) returns null (no more cached messages)
this.#messageCache.assignedSize !== 0 is true (WorkerA still holds slow PPC)
- WorkerB awaits
availablePartitions() — blocked until WorkerA returns its PPC
- New
topic-fast messages cannot be fetched until topic-slow processing completes
The same pattern exists in #consumeCachedN.
Note on Fix Complexity
Simply removing the assignedSize !== 0 condition causes rebalance test failures (consumerCacheTests.spec.js — "is cleared before rebalance"). Additional fetches cause one consumer to consume all messages from librdkafka before rebalance, leaving none for the joining consumer.
A proper fix likely requires architectural changes such as:
- Per-topic or per-partition fetch scheduling
- Allowing idle workers to fetch without blocking, while preserving rebalance safety
- Timeout-based fallback with rebalance awareness
Related
Description
Cross-topic blocking reported in #247 is still present in v1.8.2. When a single consumer group subscribes to multiple topics, slow message processing on one topic completely blocks consumption from other topics.
Benchmark Results (v1.8.2, Kafka 3)
Environment: confluent-kafka-javascript v1.8.2, Kafka 3, Node.js
Reproduction Scenario
topic-slow(1 partition),topic-fast(1 partition)topic-slow, start consuming with 100ms/msg handler delaytopic-fastwith instant handlertopic-fastto complete all 10 messagesExpected:
topic-fastcompletes in <1s (independent consumption withpartitionsConsumedConcurrently: 2)Actual:
topic-fasttook 95.32s, waiting for all 1000 slow messages to finish firstRoot Cause Analysis
The blocking originates in
lib/kafkajs/_consumer.js, specifically in#consumeSingleCachedand#consumeCachedN:Why it blocks: When
partitionsConsumedConcurrently >= 2, each worker gets a PerPartitionCache (PPC). If WorkerA is processingtopic-slow(holding its PPC), WorkerB finishestopic-fastquickly but then:this.#messageCache.next(ppc)returnsnull(no more cached messages)this.#messageCache.assignedSize !== 0istrue(WorkerA still holds slow PPC)availablePartitions()— blocked until WorkerA returns its PPCtopic-fastmessages cannot be fetched untiltopic-slowprocessing completesThe same pattern exists in
#consumeCachedN.Note on Fix Complexity
Simply removing the
assignedSize !== 0condition causes rebalance test failures (consumerCacheTests.spec.js— "is cleared before rebalance"). Additional fetches cause one consumer to consume all messages from librdkafka before rebalance, leaving none for the joining consumer.A proper fix likely requires architectural changes such as:
Related