Skip to content

Better handling for mediator time jumps in datashard #2342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 60 additions & 54 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2050,68 +2050,74 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());

TRowVersion edge;
TRowVersion readEdge = Max(
SnapshotManager.GetCompleteEdge(),
SnapshotManager.GetUnprotectedReadEdge());
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
switch (mode) {
case EMvccTxMode::ReadOnly:
// With read-only transactions we don't need reads to include
// changes made at the incomplete edge, as that is a point where
// distributed transactions performed some reads, not writes.
// Since incomplete transactions are still inflight, the actual
// version will stick to the first incomplete transaction is queue,
// effectively reading non-repeatable state before that transaction.
edge = readEdge;
break;
case EMvccTxMode::ReadWrite:
// With read-write transactions we must choose a point that is
// greater than both complete and incomplete edges. The reason
// is that incomplete transactions performed some reads at that
// point and these snapshot points must be repeatable.
// Note that as soon as the first write past the IncompleteEdge
// happens it cements all distributed transactions up to that point
// as complete, so all future reads and writes are guaranteed to
// include that point as well.
edge = writeEdge;
break;
}
TRowVersion version = [&]() {
TRowVersion edge;
TRowVersion readEdge = Max(
SnapshotManager.GetCompleteEdge(),
SnapshotManager.GetUnprotectedReadEdge());
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
switch (mode) {
case EMvccTxMode::ReadOnly:
// With read-only transactions we don't need reads to include
// changes made at the incomplete edge, as that is a point where
// distributed transactions performed some reads, not writes.
// Since incomplete transactions are still inflight, the actual
// version will stick to the first incomplete transaction is queue,
// effectively reading non-repeatable state before that transaction.
edge = readEdge;
break;
case EMvccTxMode::ReadWrite:
// With read-write transactions we must choose a point that is
// greater than both complete and incomplete edges. The reason
// is that incomplete transactions performed some reads at that
// point and these snapshot points must be repeatable.
// Note that as soon as the first write past the IncompleteEdge
// happens it cements all distributed transactions up to that point
// as complete, so all future reads and writes are guaranteed to
// include that point as well.
edge = writeEdge;
break;
}

// If there's any planned operation that is above our edge, it would be a
// suitable version for a new immediate operation. We effectively try to
// execute "before" that point if possible.
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
// If there's any planned operation that is above our edge, it would be a
// suitable version for a new immediate operation. We effectively try to
// execute "before" that point if possible.
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());

// Normally we stick transactions to the end of the last known mediator step
// Note this calculations only happen when we don't have distributed
// transactions left in queue, and we won't have any more transactions
// up to the current mediator time. The mediator time itself may be stale,
// in which case we may have evidence of its higher value via complete and
// incomplete edges above.
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());

switch (mode) {
case EMvccTxMode::ReadOnly: {
// We read at the end of the current step
return mediatorEdge;
}

case EMvccTxMode::ReadWrite: {
// We write at the end of the current step, or the start of the next step when that's protected
return Max(mediatorEdge, writeEdge.Next());
}
}

// Normally we stick transactions to the end of the last known mediator step
// Note this calculations only happen when we don't have distributed
// transactions left in queue, and we won't have any more transactions
// up to the current mediator time. The mediator time itself may be stale,
// in which case we may have evidence of its higher value via complete and
// incomplete edges above.
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
Y_ABORT("unreachable");
}();

switch (mode) {
case EMvccTxMode::ReadOnly: {
// We want to include everything that was potentially confirmed to
// users, but we don't want to include anything that is not replied
// at the start of this read.
// Note it's only possible to have ImmediateWriteEdge > mediatorEdge
// when ImmediateWriteEdge == mediatorEdge + 1
return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied());
// We must read all writes we have replied to already
return Max(version, SnapshotManager.GetImmediateWriteEdgeReplied());
}

case EMvccTxMode::ReadWrite: {
// We must use at least a previously used immediate write edge
// But we must also avoid trumpling over any unprotected mvcc
// snapshot reads that have occurred.
// Note it's only possible to go past the last known mediator step
// when we had an unprotected read, which itself happens at the
// last mediator step. So we may only ever have a +1 step, never
// anything more.
return Max(mediatorEdge, writeEdge.Next(), SnapshotManager.GetImmediateWriteEdge());
// We must never go backwards in our single-shard writes
return Max(version, SnapshotManager.GetImmediateWriteEdge());
}
}

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/time_cast/time_cast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) {
Y_UNUSED(exemption);
Y_UNUSED(exsz);

AtomicSet(Step, step);
// Mediator time shouldn't go back while shards are running
if (Get(0) < step) {
AtomicSet(Step, step);
}
}

class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
Expand Down