Description
openedon Sep 16, 2022
UPDATE (from @aaronsteers) per #976 (comment):
Since this is high urgency, I took an initial pass over the break here on tap-snowflake:
That work can be migrated here but I wanted to start with a 'real' implementation for testing and to prove the design approach.
Details
Singer SDK Version
0.10.0
Python Version
3.8
Bug scope
Taps (catalog, state, stream maps, etc.)
Operating System
macOS
Description
When passing previous state to a tap with batch mode enabled, that state is not made available via the documented Stream.get_starting_timestamp()
and Stream.get_starting_replication_key_value()
methods.
I believe this is because of missing setup of a starting_replication_value
, which the methods above depend on to retrieve state. This is easiest to see when comparing the Stream._sync_records()
and Stream._sync_batches()
methods on singer_sdk.streams.core.Stream
(snippets below). I think the critical missing call is self._write_starting_replication_value(current_context)
.
https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L1034
Code
def _sync_records(
self,
context: dict | None = None,
write_messages: bool = True,
) -> Generator[dict, Any, Any]:
"""Sync records, emitting RECORD and STATE messages.
Args:
context: Stream partition or context dictionary.
write_messages: Whether to write Singer messages to stdout.
Raises:
InvalidStreamSortException: TODO
Yields:
Each record from the source.
"""
record_count = 0
current_context: dict | None
context_list: list[dict] | None
context_list = [context] if context is not None else self.partitions
selected = self.selected
for current_context in context_list or [{}]:
partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: dict | None = (
None if current_context is None else copy.copy(current_context)
)
for record_result in self.get_records(current_context):
...
def _sync_batches(
self,
batch_config: BatchConfig,
context: dict | None = None,
) -> None:
"""Sync batches, emitting BATCH messages.
Args:
batch_config: The batch configuration.
context: Stream partition or context dictionary.
"""
for encoding, manifest in self.get_batches(batch_config, context):
self._write_batch_message(encoding=encoding, manifest=manifest)
self._write_state_message()
Activity