Skip to content

Commit a1428bf

Browse files
maxi297octavia-squidington-iii
andauthored
feat: support state migrations that affects parent streams (#770)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com>
1 parent c67c556 commit a1428bf

File tree

3 files changed

+225
-168
lines changed

3 files changed

+225
-168
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,22 +1264,12 @@ def create_concurrent_cursor_from_datetime_based_cursor(
12641264
component_definition: ComponentDefinition,
12651265
stream_name: str,
12661266
stream_namespace: Optional[str],
1267+
stream_state: MutableMapping[str, Any],
12671268
config: Config,
12681269
message_repository: Optional[MessageRepository] = None,
12691270
runtime_lookback_window: Optional[datetime.timedelta] = None,
1270-
stream_state_migrations: Optional[List[Any]] = None,
12711271
**kwargs: Any,
12721272
) -> ConcurrentCursor:
1273-
# Per-partition incremental streams can dynamically create child cursors which will pass their current
1274-
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
1275-
# incoming state and connector_state_manager that is initialized when the component factory is created
1276-
stream_state = (
1277-
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
1278-
if "stream_state" not in kwargs
1279-
else kwargs["stream_state"]
1280-
)
1281-
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1282-
12831273
component_type = component_definition.get("type")
12841274
if component_definition.get("type") != model_type.__name__:
12851275
raise ValueError(
@@ -1498,21 +1488,11 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
14981488
component_definition: ComponentDefinition,
14991489
stream_name: str,
15001490
stream_namespace: Optional[str],
1491+
stream_state: MutableMapping[str, Any],
15011492
config: Config,
15021493
message_repository: Optional[MessageRepository] = None,
1503-
stream_state_migrations: Optional[List[Any]] = None,
15041494
**kwargs: Any,
15051495
) -> ConcurrentCursor:
1506-
# Per-partition incremental streams can dynamically create child cursors which will pass their current
1507-
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
1508-
# incoming state and connector_state_manager that is initialized when the component factory is created
1509-
stream_state = (
1510-
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
1511-
if "stream_state" not in kwargs
1512-
else kwargs["stream_state"]
1513-
)
1514-
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1515-
15161496
component_type = component_definition.get("type")
15171497
if component_definition.get("type") != model_type.__name__:
15181498
raise ValueError(
@@ -1587,7 +1567,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
15871567
config: Config,
15881568
stream_state: MutableMapping[str, Any],
15891569
partition_router: PartitionRouter,
1590-
stream_state_migrations: Optional[List[Any]] = None,
15911570
attempt_to_create_cursor_if_not_provided: bool = False,
15921571
**kwargs: Any,
15931572
) -> ConcurrentPerPartitionCursor:
@@ -1647,11 +1626,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
16471626
stream_namespace=stream_namespace,
16481627
config=config,
16491628
message_repository=NoopMessageRepository(),
1650-
# stream_state_migrations=stream_state_migrations, # FIXME is it expected to run migration on per partition state too?
16511629
)
16521630
)
16531631

1654-
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
16551632
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
16561633
use_global_cursor = isinstance(
16571634
partition_router, GroupingPartitionRouter
@@ -1974,6 +1951,7 @@ def create_default_stream(
19741951
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19751952
) -> AbstractStream:
19761953
primary_key = model.primary_key.__root__ if model.primary_key else None
1954+
self._migrate_state(model, config)
19771955

19781956
partition_router = self._build_stream_slicer_from_partition_router(
19791957
model.retriever,
@@ -2135,6 +2113,23 @@ def create_default_stream(
21352113
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
21362114
)
21372115

2116+
def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None:
2117+
stream_name = model.name or ""
2118+
stream_state = self._connector_state_manager.get_stream_state(
2119+
stream_name=stream_name, namespace=None
2120+
)
2121+
if model.state_migrations:
2122+
state_transformations = [
2123+
self._create_component_from_model(state_migration, config, declarative_stream=model)
2124+
for state_migration in model.state_migrations
2125+
]
2126+
else:
2127+
state_transformations = []
2128+
stream_state = self.apply_stream_state_migrations(state_transformations, stream_state)
2129+
self._connector_state_manager.update_state_for_stream(
2130+
stream_name=stream_name, namespace=None, value=stream_state
2131+
)
2132+
21382133
def _is_stop_condition_on_cursor(self, model: DeclarativeStreamModel) -> bool:
21392134
return bool(
21402135
model.incremental_sync
@@ -2206,17 +2201,7 @@ def _build_concurrent_cursor(
22062201
config: Config,
22072202
) -> Cursor:
22082203
stream_name = model.name or ""
2209-
stream_state = self._connector_state_manager.get_stream_state(
2210-
stream_name=stream_name, namespace=None
2211-
)
2212-
2213-
if model.state_migrations:
2214-
state_transformations = [
2215-
self._create_component_from_model(state_migration, config, declarative_stream=model)
2216-
for state_migration in model.state_migrations
2217-
]
2218-
else:
2219-
state_transformations = []
2204+
stream_state = self._connector_state_manager.get_stream_state(stream_name, None)
22202205

22212206
if (
22222207
model.incremental_sync
@@ -2228,10 +2213,9 @@ def _build_concurrent_cursor(
22282213
model_type=DatetimeBasedCursorModel,
22292214
component_definition=model.incremental_sync.__dict__,
22302215
stream_name=stream_name,
2216+
stream_state=stream_state,
22312217
stream_namespace=None,
22322218
config=config or {},
2233-
stream_state=stream_state,
2234-
stream_state_migrations=state_transformations,
22352219
partition_router=stream_slicer,
22362220
attempt_to_create_cursor_if_not_provided=True, # FIXME can we remove that now?
22372221
)
@@ -2242,17 +2226,17 @@ def _build_concurrent_cursor(
22422226
component_definition=model.incremental_sync.__dict__,
22432227
stream_name=stream_name,
22442228
stream_namespace=None,
2229+
stream_state=stream_state,
22452230
config=config or {},
2246-
stream_state_migrations=state_transformations,
22472231
)
22482232
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
22492233
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
22502234
model_type=type(model.incremental_sync),
22512235
component_definition=model.incremental_sync.__dict__,
22522236
stream_name=stream_name,
22532237
stream_namespace=None,
2238+
stream_state=stream_state,
22542239
config=config or {},
2255-
stream_state_migrations=state_transformations,
22562240
attempt_to_create_cursor_if_not_provided=True,
22572241
)
22582242
else:

0 commit comments

Comments
 (0)