Fix: LOG_BASED replication bookmark not advancing between syncs#3
Merged
Conversation
4 tasks
ksohail22
added a commit
that referenced
this pull request
Mar 12, 2026
…tanoLabs#747) ## Problem After setting `is_sorted = True` on `PostgresLogBasedStream`, LOG_BASED streams crash with `InvalidStreamSortException` on subsequent syncs: ``` singer_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream. Latest value '3665872270224' is smaller than previous max '3665872270520'. ``` This happens because the previous `_increment_stream_state` delegated to the Singer SDK's `increment_state()` with `check_sorted=True`. When a stream resumes via `start_replication(start_lsn=bookmark_value)`, PostgreSQL can deliver WAL records with `data_start` slightly below the stored bookmark — particularly when the replication slot is shared across multiple tables and changes from interleaved transactions straddle the resume point. The SDK interprets this as unsorted data and raises the exception. ## Root Cause The SDK's `increment_state()` function, when called with `is_sorted=True` and `check_sorted=True` (the default), enforces that every incoming record's replication key value must be `>=` the previous maximum. It initializes the "previous maximum" from the bookmark's `replication_key_value`. If any WAL record arrives with an LSN even 1 byte below the bookmark, the stream crashes. This is a false positive: PostgreSQL's WAL is globally ordered, but when resuming from a specific LSN on a shared replication slot, records near the resume boundary can have `data_start` values marginally below the requested start position. ## Fix Replace the `_increment_stream_state` override with a self-contained max-forward-only implementation that: 1. Reads the current `replication_key_value` from state 2. Only updates the bookmark if the new value is `>=` the existing value 3. Silently skips records with lower LSN (they are still emitted to the target — only bookmark tracking is affected) 4. Removes the dependency on `singer_sdk.helpers._state.increment_state` This preserves the bookmark-advancing behavior introduced in PR #3 while eliminating the crash on minor LSN out-of-order conditions. ## Changes - **`tap_postgres/client.py`**: Rewrote `_increment_stream_state` to use direct max-forward-only comparison instead of delegating to `increment_state()`. Removed unused `from singer_sdk.helpers._state import increment_state` import. ## Why not just set `check_sorted = False`? Setting `check_sorted = False` on the class would disable the sort validation but still route through the SDK's `increment_state()` code path, which uses `progress_markers` when `is_sorted=False` and direct state updates when `is_sorted=True`. By implementing our own simple max-forward-only logic, we avoid any coupling to the SDK's internal state management semantics for this edge case and make the behavior explicit and predictable. ## Side Effects & Corner Cases - **Duplicate records to the target**: Records with LSN below the bookmark are still yielded to the target (this method only affects state tracking, not record emission). Targets using merge/upsert handle this idempotently. - **No data loss**: The bookmark only ever moves forward, so no WAL records are permanently skipped on future syncs. - **First sync (no prior bookmark)**: When `old_value is None`, the first record's LSN is always accepted, correctly bootstrapping the bookmark. - **`_sdc_lsn` is always an integer**: Simple `>=` comparison is type-safe; no string/datetime comparison edge cases. ## Test Plan - [ ] Verify that LOG_BASED streams no longer crash with `InvalidStreamSortException` on subsequent syncs - [ ] Verify that `replication_key_value` advances correctly after each sync - [ ] Verify that `send_feedback(flush_lsn=...)` reports the advanced LSN to PostgreSQL, preventing WAL growth - [ ] Run against production replication slots shared across multiple tables --------- Co-authored-by: Kashif Sohail <kashif.ezone@gmail.com> Co-authored-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
All
LOG_BASEDstreams intap-postgresfail to advance theirreplication_key_valuebookmark after a successful sync. This means:INCREMENTAL streams are unaffected — their bookmarks advance correctly.
Root Cause
PostgresLogBasedStreaminheritsis_sorted = Falsefrom the Singer SDK base class and does not override it.When
is_sortedisFalse, the SDK'sincrement_state()function writes bookmark updates to a temporaryprogress_markersbuffer rather than directly toreplication_key_valuein the stream state. These progress markers are supposed to be promoted to the main state at the end of the sync, but this promotion does not succeed — the bookmark remains frozen at its initial value.This is incorrect for WAL-based replication. PostgreSQL's logical replication protocol delivers messages in strict LSN order — the stream is inherently sorted.
Fix
Set
is_sorted = TrueonPostgresLogBasedStream:With this change,
increment_state()writesreplication_key_valuedirectly into the stream's main state dict after each record. No buffering, no promotion step, no risk of state mismatch.Impact
replication_key_valuewill correctly advance after every sync.send_feedback(flush_lsn=...)will report the new LSN to PostgreSQL, allowing it to discard consumed WAL segments and reclaim disk space.How to Verify
replication_key_valuein the state between the two runs — it should advance."Stream is assumed to be unsorted, progress is not resumable if interrupted"no longer appears.pg_replication_slots.confirmed_flush_lsnon source databases — it should advance after each sync.