File tree Expand file tree Collapse file tree 1 file changed +3
-3
lines changed
airbyte_cdk/sources/declarative Expand file tree Collapse file tree 1 file changed +3
-3
lines changed Original file line number Diff line number Diff line change 58
58
class ConcurrentDeclarativeSource (ManifestDeclarativeSource , Generic [TState ]):
59
59
# By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
60
60
# because it has hit the limit of futures but not partition reader is consuming them.
61
- SINGLE_THREADED_CONCURRENCY_LEVEL = 2
61
+ _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
62
62
63
63
def __init__ (
64
64
self ,
@@ -108,8 +108,8 @@ def __init__(
108
108
concurrency_level // 2 , 1
109
109
) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
110
110
else :
111
- concurrency_level = self .SINGLE_THREADED_CONCURRENCY_LEVEL
112
- initial_number_of_partitions_to_generate = self .SINGLE_THREADED_CONCURRENCY_LEVEL // 2
111
+ concurrency_level = self ._LOWEST_SAFE_CONCURRENCY_LEVEL
112
+ initial_number_of_partitions_to_generate = self ._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
113
113
114
114
self ._concurrent_source = ConcurrentSource .create (
115
115
num_workers = concurrency_level ,
You can’t perform that action at this time.
0 commit comments