Skip to content

Fix: LOG_BASED replication bookmark not advancing between syncs#3

Merged
ksohail22 merged 1 commit into
mainfrom
fix/wal-log-flush
Mar 10, 2026
Merged

Fix: LOG_BASED replication bookmark not advancing between syncs#3
ksohail22 merged 1 commit into
mainfrom
fix/wal-log-flush

Conversation

@ksohail22
Copy link
Copy Markdown
Owner

Problem

All LOG_BASED streams in tap-postgres fail to advance their replication_key_value bookmark after a successful sync. This means:

  • Every sync re-reads the same WAL data from the starting LSN, regardless of how many records were processed.
  • The PostgreSQL replication slot is never flushed past the original LSN, causing unbounded WAL growth on source databases.
  • Sync durations grow over time as the accumulated WAL backlog increases.
  • Increased risk of disk exhaustion on production PostgreSQL instances.

INCREMENTAL streams are unaffected — their bookmarks advance correctly.

Root Cause

PostgresLogBasedStream inherits is_sorted = False from the Singer SDK base class and does not override it.

When is_sorted is False, the SDK's increment_state() function writes bookmark updates to a temporary progress_markers buffer rather than directly to replication_key_value in the stream state. These progress markers are supposed to be promoted to the main state at the end of the sync, but this promotion does not succeed — the bookmark remains frozen at its initial value.

This is incorrect for WAL-based replication. PostgreSQL's logical replication protocol delivers messages in strict LSN order — the stream is inherently sorted.

Fix

Set is_sorted = True on PostgresLogBasedStream:

class PostgresLogBasedStream(SQLStream):
    replication_key = "_sdc_lsn"
    is_sorted = True

With this change, increment_state() writes replication_key_value directly into the stream's main state dict after each record. No buffering, no promotion step, no risk of state mismatch.

Impact

  • Bookmark advancement: replication_key_value will correctly advance after every sync.
  • WAL flush: send_feedback(flush_lsn=...) will report the new LSN to PostgreSQL, allowing it to discard consumed WAL segments and reclaim disk space.
  • First run after deploy: Will process the backlog of WAL accumulated since the bookmark was last truly updated. May take longer than usual.
  • Subsequent runs: Will only process new WAL records since the last sync — fast and efficient.
  • Backward compatible: No state format changes. Existing bookmarks continue to work; they will simply start advancing from their current position.

How to Verify

  1. Run any LOG_BASED stream twice after deploying this change.
  2. Compare the replication_key_value in the state between the two runs — it should advance.
  3. Confirm the log message "Stream is assumed to be unsorted, progress is not resumable if interrupted" no longer appears.
  4. Monitor pg_replication_slots.confirmed_flush_lsn on source databases — it should advance after each sync.

@ksohail22 ksohail22 merged commit 2878bfd into main Mar 10, 2026
ksohail22 added a commit that referenced this pull request Mar 12, 2026
…tanoLabs#747)

## Problem

After setting `is_sorted = True` on `PostgresLogBasedStream`, LOG_BASED
streams crash with `InvalidStreamSortException` on subsequent syncs:

```
singer_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream.
Latest value '3665872270224' is smaller than previous max '3665872270520'.
```

This happens because the previous `_increment_stream_state` delegated to
the Singer SDK's `increment_state()` with `check_sorted=True`. When a
stream resumes via `start_replication(start_lsn=bookmark_value)`,
PostgreSQL can deliver WAL records with `data_start` slightly below the
stored bookmark — particularly when the replication slot is shared
across multiple tables and changes from interleaved transactions
straddle the resume point. The SDK interprets this as unsorted data and
raises the exception.

## Root Cause

The SDK's `increment_state()` function, when called with
`is_sorted=True` and `check_sorted=True` (the default), enforces that
every incoming record's replication key value must be `>=` the previous
maximum. It initializes the "previous maximum" from the bookmark's
`replication_key_value`. If any WAL record arrives with an LSN even 1
byte below the bookmark, the stream crashes.

This is a false positive: PostgreSQL's WAL is globally ordered, but when
resuming from a specific LSN on a shared replication slot, records near
the resume boundary can have `data_start` values marginally below the
requested start position.

## Fix

Replace the `_increment_stream_state` override with a self-contained
max-forward-only implementation that:

1. Reads the current `replication_key_value` from state
2. Only updates the bookmark if the new value is `>=` the existing value
3. Silently skips records with lower LSN (they are still emitted to the
target — only bookmark tracking is affected)
4. Removes the dependency on `singer_sdk.helpers._state.increment_state`

This preserves the bookmark-advancing behavior introduced in PR #3 while
eliminating the crash on minor LSN out-of-order conditions.

## Changes

- **`tap_postgres/client.py`**: Rewrote `_increment_stream_state` to use
direct max-forward-only comparison instead of delegating to
`increment_state()`. Removed unused `from singer_sdk.helpers._state
import increment_state` import.

## Why not just set `check_sorted = False`?

Setting `check_sorted = False` on the class would disable the sort
validation but still route through the SDK's `increment_state()` code
path, which uses `progress_markers` when `is_sorted=False` and direct
state updates when `is_sorted=True`. By implementing our own simple
max-forward-only logic, we avoid any coupling to the SDK's internal
state management semantics for this edge case and make the behavior
explicit and predictable.

## Side Effects & Corner Cases

- **Duplicate records to the target**: Records with LSN below the
bookmark are still yielded to the target (this method only affects state
tracking, not record emission). Targets using merge/upsert handle this
idempotently.
- **No data loss**: The bookmark only ever moves forward, so no WAL
records are permanently skipped on future syncs.
- **First sync (no prior bookmark)**: When `old_value is None`, the
first record's LSN is always accepted, correctly bootstrapping the
bookmark.
- **`_sdc_lsn` is always an integer**: Simple `>=` comparison is
type-safe; no string/datetime comparison edge cases.

## Test Plan

- [ ] Verify that LOG_BASED streams no longer crash with
`InvalidStreamSortException` on subsequent syncs
- [ ] Verify that `replication_key_value` advances correctly after each
sync
- [ ] Verify that `send_feedback(flush_lsn=...)` reports the advanced
LSN to PostgreSQL, preventing WAL growth
- [ ] Run against production replication slots shared across multiple
tables

---------

Co-authored-by: Kashif Sohail <kashif.ezone@gmail.com>
Co-authored-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants