Skip to content

storage: produce definite snapshot #17114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 30, 2023
Merged

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Jan 12, 2023

Motivation

This PR is fixing a long standing issue with how postgres sources were producing their initial snapshot. The problem was that because the snapshot was being produced inside the same transaction that created the replication slot, in order for the LSN of the slot and the snapshot to match up, we only had one chance to produce the snapshot. Usually this wasn't a problem because of a set of fragile conditions:

  • The snapshot is transactionally committed to persist shards
  • We always restarted at the upper frontier of the output

These two taken together meant that we could be smart and only produce the snapshot when we were requested to ingest from LSN 0, but anything non-zero implied that the snapshot had already been committed and so we could skip it, saving ourselves from the difficult task of reproducing it at the exact LSN of the slot.

It turns out that multi-output sources break the first condition as it's possible that only a subset of subsource to commit the snapshots in which case we'll have to recreate the snapshot on restart. The result is a pretty serious correctness issue https://github.com/MaterializeInc/database-issues/issues/4877

This PRs fixes the problem by always producing the snapshot (when the snapshot is needed of course) at the same LSN. The issue that the naive approach has is that a database snapshot happens at an LSN that we don't control, and it will almost always be at an LSN greater than the LSN of the replication slot. The key observation is that just like a differential collection can start from a snapshot and by adding diffs be rolled forward in time we can also subtract updates and rewind it in time. And it just so happens that we have the updates we need to subtract in the replication slot!

At a high level the postgres source does the following:

  1. Create the replication slot if it doesn't already exist and note down its slot_lsn
  2. Take a snapshot in a separate transaction which will happen at some snapshot_lsn but emit all its updates at timestamp slot_lsn
  3. If slot_lsn < snapshot_lsn start streaming the replication slot and for each update:
    *. If the update's lsn is <= snapshot_lsn emit it at slot_lsn with negated diff
    • Once the stream goes beyond snapshot_lsn stop rewinding and mark the snapshot as complete
  4. Start the normal replication stream

In order to do the above a few changes needed to be done to the structure of the postgres source:

  1. The produce_snapshot and produce_replication methods are now freestanding functions that only get a pg client and give back an async stream with the updates without altering any state. This make it easy to instantiate the stream multiple times when needed.
  2. Getting in and out of the replication stream to consume potential lag has been made a lot more aggressive and only based on time.

Finally, there was a bug with our tokio-postgres fork where dropping a replication stream failed to put the connection in query mode. This is something that has been fixed in the rust-postgres PR that is still pending upstream (). For this reason I updated our fork to contain that PR as-is and the replication specific code has been moved to mz_postgres_util. This is the reason why this PR is so big.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.

  • This PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way) and therefore is tagged with a T-proto label.

  • If this PR will require changes to cloud orchestration, there is a
    companion cloud PR to account for those changes that is tagged with
    the release-blocker label (example).

  • This PR includes the following user-facing behavior changes:

@petrosagg petrosagg force-pushed the pg-definite branch 4 times, most recently from e259b6a to 799037a Compare January 13, 2023 15:52
@petrosagg petrosagg changed the title pg refactor storage: produce definite snapshot Jan 13, 2023
@petrosagg petrosagg force-pushed the pg-definite branch 2 times, most recently from a584487 to ce250d2 Compare January 13, 2023 21:44
@benesch
Copy link
Contributor

benesch commented Jan 14, 2023

The key observation is that just like a differential collection can start from a snapshot and by adding diffs be rolled forward in time we can also subtract updates and rewind it in time. And it just so happens that we have the updates we need to subtract in the replication slot!

This is such a cool idea. 🤯

@petrosagg petrosagg force-pushed the pg-definite branch 3 times, most recently from 87ce792 to 15adcc6 Compare January 14, 2023 20:40
@guswynn
Copy link
Contributor

guswynn commented Jan 17, 2023

@aljoscha @petrosagg Updated with the repro test now passing; I need to now convince myself as to WHY its passing.

Also, there is some amount of new flakiness in the tests, where something like this happens:

cluster-storage-1  | 2023-01-17T17:58:21.242428Z  INFO mz_storage::source::reclock::compat: remap(u5) 1/4 initializing PersistHandle since=Antichain { elements: [0] } as_of=Antichain { elements: [0] } upper=Antichain { elements: [1673978299001] }
cluster-storage-1  | 2023-01-17T17:58:31.070458Z  INFO mz_storage::source::resumption: resumption(u5) 1/4: calculated new resumption frontier resumption_frontier=Antichain { elements: [1673978297000] }
cluster-storage-1  | 2023-01-17T17:58:51.186858Z  INFO mz_storage::source::postgres: Found 0 changes in the wal. slot="materialize_171b7d496b074c63b0f91aa64c44b239" query_time=5.031625ms current_lsn=0/165FEE8
cluster-storage-1  | 2023-01-17T17:58:51.194159Z  WARN mz_storage::source::postgres: irrecoverable error for source u5: unexpected message from server
cluster-storage-1  | 2023-01-17T17:58:52.469990Z  WARN mz_storage::source::source_reader_pipeline: halting process: halting with status StalledWithError("unexpected message from server")

I added an error.cause() to the log line to see if we glean more information; I imagine there is some bug in how we transfer the client between modes...

@guswynn guswynn force-pushed the pg-definite branch 3 times, most recently from 68ababf to dc722c7 Compare January 18, 2023 21:34
@guswynn
Copy link
Contributor

guswynn commented Jan 18, 2023

Updated this pr to get it to a merge-able state; this amounts to 3 main changes:

  • Added my test that repros the original bug, and ensured that it passes
    • @aljoscha I was originally confused as to why this worked, because I imagined that the sub-sources that _already committed the snapshot would received additional negative diff'd data and cause errors; My understanding reading @petrosagg's code is that this doesn't happen before those negative diffs only negate the extra values produced by the newer-larger snapshot.
  • Removed all the changes related to updating our upstream postgres; This was causing transient failures in the connection handling in the tokio-postgres crate, so I opted to just aggressively rebuild postgres clients.
    • This is noticeable slower on startup, but I left a TODO to optimize this as we figure out what the underlying bug was.
      • I also made it so we reset the WAL_LAG_GRACE_PERIOD every-time we peek ahead, as I believe in the refactor this was a slight logic bug that made us busy-loop peeking wal changes.
    • This also was very hard to get right (the tokio-postgres crate does NOT make this easy, so I commented around the fragile touch-points
  • Critically, I had to change a core piece of logic: https://github.com/MaterializeInc/materialize/pull/17114/files#diff-6dedbd14e351ac8889cd651ca5d9fd465ff3aebc4d77ebb242b184d288378145R1296
    • In prose: I had to make it so when we are fast-forwarding by peeking the wal changes, we add 1 to the wal end that we have. This is to match I don't know why the tests were able to pass before without my change, as in my testing it seemed required.
      • One theory I have was that the switch to re-creating clients resets the PrimaryKeepAlives in such a way that we don't actually receive an update at a later lsn, but this is completely unsubstantiated.
      • CI appears to pass fully, even with my change...
      • Reviewers: please do some light thinking about this

@guswynn guswynn marked this pull request as ready for review January 18, 2023 21:57
@guswynn guswynn requested a review from a team January 18, 2023 21:57
@petrosagg
Copy link
Contributor Author

I reviewed and rebased this PR on top of main and I think the logic is correct. I chased the implementation of the logical replication producer code in pg and when calling pg_logical_slot_peek_binary_changes without passing an upto parameter it effectively does a while record.end < wal_end, which means that it is correct to set our upper to wal_end


/// Creates the replication slot and produces the initial snapshot of the data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of this rustdoc is now outdated? Also: is it true that this "automagically" uses the temporary SLOT that we create right before calling this method, and it therefore produces the data as of snapshot_lsn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated! This method will run in whatever client context it is invoked with. Depending on the situation call it either in a transaction whose LSN was set by a temp slot or by the actual real slot. You can see the logic in the call site

petrosagg and others added 5 commits January 30, 2023 17:49
This PR is fixing a long standing issue with how postgres sources were
producing their initial snapshot. The problem was that because the
snapshot was being produced inside the same transaction that created the
replication slot, in order for the LSN of the slot and the snapshot to
match up, we only had one chance to produce the snapshot. Usually this
wasn't a problem because of a set of fragile conditions:

* The snapshot is transactionally committed to persist shards
* We always restarted at the upper frontier of the output

These two taken together meant that we could be smart and only produce
the snapshot when we were requested to ingest from LSN 0, but anything
non-zero implied that the snapshot had already been committed and so we
could skip it, saving ourselves from the difficult task of reproducing
it at the exact LSN of the slot.

It turns out that multi-output sources break the first condition as it's
possible that only a subset of subsource to commit the snapshots in
which case we'll have to recreate the snapshot on restart. The result is
a pretty serious correctness issue
https://github.com/MaterializeInc/materialize/issues/16865

This PRs fixes the problem by always producing the snapshot (when the
snapshot is needed of course) at the same LSN. The issue that the naive
approach has is that a database snapshot happens at an LSN that we don't
control, and it will almost always be at an LSN greater than the LSN of
the replication slot. The key observation is that just like a
differential collection can start from a snapshot and by adding diffs be
rolled forward in time we can also subtract updates and rewind it in
time. And it just so happens that we have the updates we need to
subtract in the replication slot!

At a high level the postgres source does the following:

1. Create the replication slot if it doesn't already exist and note down
   its `slot_lsn`
2. Take a snapshot in a separate transaction which will happen at some
   `snapshot_lsn` but emit all its updates at timestamp `slot_lsn`
3. If `slot_lsn < snapshot_lsn` start streaming the replication slot and
   for each update:
    * If the update's lsn is `<= snapshot_lsn` emit it at `slot_lsn`
      with negated diff
    * Once the stream goes beyond `snapshot_lsn ` stop rewinding and
      mark the snapshot as complete
4. Start the normal replication stream

In order to do the above a few changes needed to be done to the
structure of the postgres source:

1. The `produce_snapshot` and `produce_replication` methods are now
   freestanding functions that only get a pg client and give back an
   async stream with the updates without altering any state. This make
   it easy to instantiate the stream multiple times when needed.
2. Getting in and out of the replication stream to consume potential lag
   has been made a lot more aggressive and only based on time.

Finally, there was a bug with our `tokio-postgres` fork where dropping a
replication stream failed to put the connection in query mode. This is
something that has been fixed in the `rust-postgres` PR that is still
pending upstream (). For this reason I updated our fork to contain that
PR as-is and the replication specific code has been moved to
`mz_postgres_util`. This is the reason why this PR is so big.
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't closely re-reviewed the code, but I am confident all 3 of us have looked at it long enough to be happy with it.

@philip-stoev before the next release cut, we probably want to run RQ on this!

@petrosagg petrosagg enabled auto-merge January 30, 2023 17:14
@petrosagg petrosagg merged commit 683a5d2 into MaterializeInc:main Jan 30, 2023
@petrosagg petrosagg deleted the pg-definite branch January 30, 2023 17:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants