|
46 | 46 | from airbyte_cdk.sources.source import TState
|
47 | 47 | from airbyte_cdk.sources.streams import Stream
|
48 | 48 | from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
|
| 49 | +from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade |
49 | 50 | from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
|
50 | 51 | AlwaysAvailableAvailabilityStrategy,
|
51 | 52 | )
|
52 | 53 | from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
|
53 | 54 | from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
|
54 | 55 | from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
|
55 |
| -from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade |
56 | 56 |
|
57 | 57 |
|
58 | 58 | class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
|
@@ -379,7 +379,10 @@ def _group_streams(
|
379 | 379 | synchronous_streams.append(declarative_stream)
|
380 | 380 | # Condition below needs to ensure that concurrent support is not lost for sources that already support
|
381 | 381 | # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
|
382 |
| - elif isinstance(declarative_stream, AbstractStreamFacade) and self.is_partially_declarative: |
| 382 | + elif ( |
| 383 | + isinstance(declarative_stream, AbstractStreamFacade) |
| 384 | + and self.is_partially_declarative |
| 385 | + ): |
383 | 386 | concurrent_streams.append(declarative_stream.get_underlying_stream())
|
384 | 387 | else:
|
385 | 388 | synchronous_streams.append(declarative_stream)
|
|
0 commit comments