-
Notifications
You must be signed in to change notification settings - Fork 37
Open
Description
Hi Airbyte team,
I’m hitting a performance bottleneck with PartitionEnqueuer in scenarios with a very large number of partitions.
Context:
- I have a stream generating 30,000+ partitions.
- Each sync runs 6 repeated partitions.
- The current
while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit(): time.sleep(...)logic inpartition_enqueuer.py(line 59) throttles partition enqueuing by continuously sleeping while the thread pool is full.
Problem:
- With this many partitions, the partition generation process is extremely slow because every partition is throttled individually.
- In my case, the total waiting time for partition enqueueing exceeds 6 hours.
- The issue is primarily due to the blocking sleep loop for each partition, which effectively serializes partition generation when the thread pool is at capacity.
Thread state observed with py-spy:
Thread 17 (active): "Thread-1 (_leak_task_sync)"
_leak_task_sync (pyrate_limiter/abstracts/bucket.py:157)
run (threading.py:994)
_bootstrap_inner (threading.py:1043)
_bootstrap (threading.py:1014)
Thread 18 (active): "workerpool_0"
generate_partitions (airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py:59)
run (concurrent/futures/thread.py:59)
_worker (concurrent/futures/thread.py:93)
run (threading.py:994)
_bootstrap_inner (threading.py:1043)
_bootstrap (threading.py:1014)
- This shows the partition generation thread is effectively stuck on line 59 in
partition_enqueuer.py, waiting for the thread pool to free up futures.
Impact:
- Very high latency for large-scale partitioned streams.
- CPU overhead due to repeated sleep-wake cycles.
- Makes concurrent ingestion impractical for large partition counts.
Would love to hear your thoughts or recommended solutions for handling extremely high partition counts efficiently.
Thanks!
Metadata
Metadata
Assignees
Labels
No labels