Skip to content

fix: Initialize max_replication_key_value via SELECT max(<replication_key>) ... before starting a native BATCH sync #976

Description

UPDATE (from @aaronsteers) per #976 (comment):

Since this is high urgency, I took an initial pass over the break here on tap-snowflake:

That work can be migrated here but I wanted to start with a 'real' implementation for testing and to prove the design approach.

Details

Singer SDK Version

0.10.0

Python Version

3.8

Bug scope

Taps (catalog, state, stream maps, etc.)

Operating System

macOS

Description

When passing previous state to a tap with batch mode enabled, that state is not made available via the documented Stream.get_starting_timestamp() and Stream.get_starting_replication_key_value() methods.

I believe this is because of missing setup of a starting_replication_value, which the methods above depend on to retrieve state. This is easiest to see when comparing the Stream._sync_records() and Stream._sync_batches() methods on singer_sdk.streams.core.Stream (snippets below). I think the critical missing call is self._write_starting_replication_value(current_context).

https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L1034

Code

def _sync_records(
        self,
        context: dict | None = None,
        write_messages: bool = True,
    ) -> Generator[dict, Any, Any]:
        """Sync records, emitting RECORD and STATE messages.

        Args:
            context: Stream partition or context dictionary.
            write_messages: Whether to write Singer messages to stdout.

        Raises:
            InvalidStreamSortException: TODO

        Yields:
            Each record from the source.
        """
        record_count = 0
        current_context: dict | None
        context_list: list[dict] | None
        context_list = [context] if context is not None else self.partitions
        selected = self.selected

        for current_context in context_list or [{}]:
            partition_record_count = 0
            current_context = current_context or None
            state = self.get_context_state(current_context)
            state_partition_context = self._get_state_partition_context(current_context)
            self._write_starting_replication_value(current_context)
            child_context: dict | None = (
                None if current_context is None else copy.copy(current_context)
            )

            for record_result in self.get_records(current_context):
              ...
    def _sync_batches(
        self,
        batch_config: BatchConfig,
        context: dict | None = None,
    ) -> None:
        """Sync batches, emitting BATCH messages.

        Args:
            batch_config: The batch configuration.
            context: Stream partition or context dictionary.
        """
        for encoding, manifest in self.get_batches(batch_config, context):
            self._write_batch_message(encoding=encoding, manifest=manifest)
            self._write_state_message()

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions