-
Notifications
You must be signed in to change notification settings - Fork 472
storage: produce definite snapshot #17114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e259b6a
to
799037a
Compare
a584487
to
ce250d2
Compare
This is such a cool idea. 🤯 |
87ce792
to
15adcc6
Compare
@aljoscha @petrosagg Updated with the repro test now passing; I need to now convince myself as to WHY its passing. Also, there is some amount of new flakiness in the tests, where something like this happens:
I added an |
68ababf
to
dc722c7
Compare
Updated this pr to get it to a merge-able state; this amounts to 3 main changes:
|
0c9dd3f
to
cfc6e25
Compare
I reviewed and rebased this PR on top of main and I think the logic is correct. I chased the implementation of the logical replication producer code in pg and when calling |
cfc6e25
to
e30b7b4
Compare
src/storage/src/source/postgres.rs
Outdated
|
||
/// Creates the replication slot and produces the initial snapshot of the data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all of this rustdoc is now outdated? Also: is it true that this "automagically" uses the temporary SLOT that we create right before calling this method, and it therefore produces the data as of snapshot_lsn
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated! This method will run in whatever client context it is invoked with. Depending on the situation call it either in a transaction whose LSN was set by a temp slot or by the actual real slot. You can see the logic in the call site
a3085d1
to
858c1b4
Compare
This PR is fixing a long standing issue with how postgres sources were producing their initial snapshot. The problem was that because the snapshot was being produced inside the same transaction that created the replication slot, in order for the LSN of the slot and the snapshot to match up, we only had one chance to produce the snapshot. Usually this wasn't a problem because of a set of fragile conditions: * The snapshot is transactionally committed to persist shards * We always restarted at the upper frontier of the output These two taken together meant that we could be smart and only produce the snapshot when we were requested to ingest from LSN 0, but anything non-zero implied that the snapshot had already been committed and so we could skip it, saving ourselves from the difficult task of reproducing it at the exact LSN of the slot. It turns out that multi-output sources break the first condition as it's possible that only a subset of subsource to commit the snapshots in which case we'll have to recreate the snapshot on restart. The result is a pretty serious correctness issue https://github.com/MaterializeInc/materialize/issues/16865 This PRs fixes the problem by always producing the snapshot (when the snapshot is needed of course) at the same LSN. The issue that the naive approach has is that a database snapshot happens at an LSN that we don't control, and it will almost always be at an LSN greater than the LSN of the replication slot. The key observation is that just like a differential collection can start from a snapshot and by adding diffs be rolled forward in time we can also subtract updates and rewind it in time. And it just so happens that we have the updates we need to subtract in the replication slot! At a high level the postgres source does the following: 1. Create the replication slot if it doesn't already exist and note down its `slot_lsn` 2. Take a snapshot in a separate transaction which will happen at some `snapshot_lsn` but emit all its updates at timestamp `slot_lsn` 3. If `slot_lsn < snapshot_lsn` start streaming the replication slot and for each update: * If the update's lsn is `<= snapshot_lsn` emit it at `slot_lsn` with negated diff * Once the stream goes beyond `snapshot_lsn ` stop rewinding and mark the snapshot as complete 4. Start the normal replication stream In order to do the above a few changes needed to be done to the structure of the postgres source: 1. The `produce_snapshot` and `produce_replication` methods are now freestanding functions that only get a pg client and give back an async stream with the updates without altering any state. This make it easy to instantiate the stream multiple times when needed. 2. Getting in and out of the replication stream to consume potential lag has been made a lot more aggressive and only based on time. Finally, there was a bug with our `tokio-postgres` fork where dropping a replication stream failed to put the connection in query mode. This is something that has been fixed in the `rust-postgres` PR that is still pending upstream (). For this reason I updated our fork to contain that PR as-is and the replication specific code has been moved to `mz_postgres_util`. This is the reason why this PR is so big.
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't closely re-reviewed the code, but I am confident all 3 of us have looked at it long enough to be happy with it.
@philip-stoev before the next release cut, we probably want to run RQ on this!
858c1b4
to
728b709
Compare
Motivation
This PR is fixing a long standing issue with how postgres sources were producing their initial snapshot. The problem was that because the snapshot was being produced inside the same transaction that created the replication slot, in order for the LSN of the slot and the snapshot to match up, we only had one chance to produce the snapshot. Usually this wasn't a problem because of a set of fragile conditions:
These two taken together meant that we could be smart and only produce the snapshot when we were requested to ingest from LSN 0, but anything non-zero implied that the snapshot had already been committed and so we could skip it, saving ourselves from the difficult task of reproducing it at the exact LSN of the slot.
It turns out that multi-output sources break the first condition as it's possible that only a subset of subsource to commit the snapshots in which case we'll have to recreate the snapshot on restart. The result is a pretty serious correctness issue https://github.com/MaterializeInc/database-issues/issues/4877
This PRs fixes the problem by always producing the snapshot (when the snapshot is needed of course) at the same LSN. The issue that the naive approach has is that a database snapshot happens at an LSN that we don't control, and it will almost always be at an LSN greater than the LSN of the replication slot. The key observation is that just like a differential collection can start from a snapshot and by adding diffs be rolled forward in time we can also subtract updates and rewind it in time. And it just so happens that we have the updates we need to subtract in the replication slot!
At a high level the postgres source does the following:
slot_lsn
snapshot_lsn
but emit all its updates at timestampslot_lsn
slot_lsn < snapshot_lsn
start streaming the replication slot and for each update:*. If the update's lsn is
<= snapshot_lsn
emit it atslot_lsn
with negated diffsnapshot_lsn
stop rewinding and mark the snapshot as completeIn order to do the above a few changes needed to be done to the structure of the postgres source:
produce_snapshot
andproduce_replication
methods are now freestanding functions that only get a pg client and give back an async stream with the updates without altering any state. This make it easy to instantiate the stream multiple times when needed.Finally, there was a bug with our
tokio-postgres
fork where dropping a replication stream failed to put the connection in query mode. This is something that has been fixed in therust-postgres
PR that is still pending upstream (). For this reason I updated our fork to contain that PR as-is and the replication specific code has been moved tomz_postgres_util
. This is the reason why this PR is so big.Checklist
This PR has adequate test coverage / QA involvement has been duly considered.
This PR evolves an existing
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-proto
label.If this PR will require changes to cloud orchestration, there is a
companion cloud PR to account for those changes that is tagged with
the release-blocker label (example).
This PR includes the following user-facing behavior changes: