Skip to content

Cross-topic blocking still present in v1.8.2 (related to #247) #468

@beygee

Description

@beygee

Description

Cross-topic blocking reported in #247 is still present in v1.8.2. When a single consumer group subscribes to multiple topics, slow message processing on one topic completely blocks consumption from other topics.

Benchmark Results (v1.8.2, Kafka 3)

Metric Value
topic-fast completion (from publish) 95.32s
topic-slow processed (at fast complete) 1000/1000
topic-slow progress 100.0%
Blocked? YES - cross-topic blocking detected
partitionsConsumedConcurrently 2

Environment: confluent-kafka-javascript v1.8.2, Kafka 3, Node.js

Reproduction Scenario

  1. Subscribe to 2 topics with a single consumer group: topic-slow (1 partition), topic-fast (1 partition)
  2. Publish 1000 messages to topic-slow, start consuming with 100ms/msg handler delay
  3. After 50 slow messages processed, publish 10 messages to topic-fast with instant handler
  4. Measure time for topic-fast to complete all 10 messages

Expected: topic-fast completes in <1s (independent consumption with partitionsConsumedConcurrently: 2)
Actual: topic-fast took 95.32s, waiting for all 1000 slow messages to finish first

Root Cause Analysis

The blocking originates in lib/kafkajs/_consumer.js, specifically in #consumeSingleCached and #consumeCachedN:

// lib/kafkajs/_consumer.js — #consumeSingleCached
async #consumeSingleCached(ppc) {
    const msg = this.#messageCache.next(ppc);
    if (msg) {
      return msg;
    }

    // THIS CONDITION CAUSES CROSS-TOPIC BLOCKING:
    if (!msg && this.#messageCache.assignedSize !== 0) {
      await this.#messageCache.availablePartitions();
      return null;
    }

    return this.#fetchAndResolveWith(() => this.#messageCache.next(),
      this.#messageCacheMaxSize);
}

Why it blocks: When partitionsConsumedConcurrently >= 2, each worker gets a PerPartitionCache (PPC). If WorkerA is processing topic-slow (holding its PPC), WorkerB finishes topic-fast quickly but then:

  1. this.#messageCache.next(ppc) returns null (no more cached messages)
  2. this.#messageCache.assignedSize !== 0 is true (WorkerA still holds slow PPC)
  3. WorkerB awaits availablePartitions()blocked until WorkerA returns its PPC
  4. New topic-fast messages cannot be fetched until topic-slow processing completes

The same pattern exists in #consumeCachedN.

Note on Fix Complexity

Simply removing the assignedSize !== 0 condition causes rebalance test failures (consumerCacheTests.spec.js — "is cleared before rebalance"). Additional fetches cause one consumer to consume all messages from librdkafka before rebalance, leaving none for the joining consumer.

A proper fix likely requires architectural changes such as:

  • Per-topic or per-partition fetch scheduling
  • Allowing idle workers to fetch without blocking, while preserving rebalance safety
  • Timeout-based fallback with rebalance awareness

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions