Skip to content

Commit ef0ca58

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source (#347)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com>
1 parent 6ef3153 commit ef0ca58

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from airbyte_cdk.sources.source import TState
4545
from airbyte_cdk.sources.streams import Stream
4646
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
47+
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
4748
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
4849
AlwaysAvailableAvailabilityStrategy,
4950
)
@@ -118,6 +119,12 @@ def __init__(
118119
message_repository=self.message_repository,
119120
)
120121

122+
# TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
123+
@property
124+
def is_partially_declarative(self) -> bool:
125+
"""This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
126+
return False
127+
121128
def read(
122129
self,
123130
logger: logging.Logger,
@@ -369,6 +376,14 @@ def _group_streams(
369376
)
370377
else:
371378
synchronous_streams.append(declarative_stream)
379+
# TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
380+
# Condition below needs to ensure that concurrent support is not lost for sources that already support
381+
# it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
382+
elif (
383+
isinstance(declarative_stream, AbstractStreamFacade)
384+
and self.is_partially_declarative
385+
):
386+
concurrent_streams.append(declarative_stream.get_underlying_stream())
372387
else:
373388
synchronous_streams.append(declarative_stream)
374389

0 commit comments

Comments
 (0)