@@ -398,8 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
398398 if (RegistrationSended)
399399 return ;
400400
401- if (!ProcessingParams)
401+ if (!ProcessingParams) {
402+ LOG_DEBUG_S (ctx, NKikimrServices::TX_DATASHARD, TabletID ()
403+ << " not sending time cast registration request in state "
404+ << DatashardStateName (State)
405+ << " : missing processing params" );
402406 return ;
407+ }
408+
409+ if (State == TShardState::WaitScheme ||
410+ State == TShardState::SplitDstReceivingSnapshot)
411+ {
412+ // We don't have all the necessary info yet
413+ LOG_DEBUG_S (ctx, NKikimrServices::TX_DATASHARD, TabletID ()
414+ << " not sending time cast registration request in state "
415+ << DatashardStateName (State));
416+ return ;
417+ }
403418
404419 LOG_INFO_S (ctx, NKikimrServices::TX_DATASHARD, " Send registration request to time cast "
405420 << DatashardStateName (State) << " tabletId " << TabletID ()
@@ -1961,68 +1976,81 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
19611976 }
19621977 }
19631978
1964- TRowVersion edge;
1965- TRowVersion readEdge = Max (
1966- SnapshotManager.GetCompleteEdge (),
1967- SnapshotManager.GetUnprotectedReadEdge ());
1968- TRowVersion writeEdge = Max (readEdge, SnapshotManager.GetIncompleteEdge ());
1969- switch (mode) {
1970- case EMvccTxMode::ReadOnly:
1971- // With read-only transactions we don't need reads to include
1972- // changes made at the incomplete edge, as that is a point where
1973- // distributed transactions performed some reads, not writes.
1974- // Since incomplete transactions are still inflight, the actual
1975- // version will stick to the first incomplete transaction is queue,
1976- // effectively reading non-repeatable state before that transaction.
1977- edge = readEdge;
1978- break ;
1979- case EMvccTxMode::ReadWrite:
1980- // With read-write transactions we must choose a point that is
1981- // greater than both complete and incomplete edges. The reason
1982- // is that incomplete transactions performed some reads at that
1983- // point and these snapshot points must be repeatable.
1984- // Note that as soon as the first write past the IncompleteEdge
1985- // happens it cements all distributed transactions up to that point
1986- // as complete, so all future reads and writes are guaranteed to
1987- // include that point as well.
1988- edge = writeEdge;
1989- break ;
1990- }
1979+ LOG_TRACE_S (*TlsActivationContext, NKikimrServices::TX_DATASHARD, " GetMvccTxVersion at " << TabletID ()
1980+ << " CompleteEdge# " << SnapshotManager.GetCompleteEdge ()
1981+ << " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge ()
1982+ << " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge ()
1983+ << " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge ()
1984+ << " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied ());
1985+
1986+ TRowVersion version = [&]() {
1987+ TRowVersion edge;
1988+ TRowVersion readEdge = Max (
1989+ SnapshotManager.GetCompleteEdge (),
1990+ SnapshotManager.GetUnprotectedReadEdge ());
1991+ TRowVersion writeEdge = Max (readEdge, SnapshotManager.GetIncompleteEdge ());
1992+ switch (mode) {
1993+ case EMvccTxMode::ReadOnly:
1994+ // With read-only transactions we don't need reads to include
1995+ // changes made at the incomplete edge, as that is a point where
1996+ // distributed transactions performed some reads, not writes.
1997+ // Since incomplete transactions are still inflight, the actual
1998+ // version will stick to the first incomplete transaction is queue,
1999+ // effectively reading non-repeatable state before that transaction.
2000+ edge = readEdge;
2001+ break ;
2002+ case EMvccTxMode::ReadWrite:
2003+ // With read-write transactions we must choose a point that is
2004+ // greater than both complete and incomplete edges. The reason
2005+ // is that incomplete transactions performed some reads at that
2006+ // point and these snapshot points must be repeatable.
2007+ // Note that as soon as the first write past the IncompleteEdge
2008+ // happens it cements all distributed transactions up to that point
2009+ // as complete, so all future reads and writes are guaranteed to
2010+ // include that point as well.
2011+ edge = writeEdge;
2012+ break ;
2013+ }
19912014
1992- // If there's any planned operation that is above our edge, it would be a
1993- // suitable version for a new immediate operation. We effectively try to
1994- // execute "before" that point if possible.
1995- if (auto nextOp = Pipeline.GetNextPlannedOp (edge.Step , edge.TxId ))
1996- return TRowVersion (nextOp->GetStep (), nextOp->GetTxId ());
2015+ // If there's any planned operation that is above our edge, it would be a
2016+ // suitable version for a new immediate operation. We effectively try to
2017+ // execute "before" that point if possible.
2018+ if (auto nextOp = Pipeline.GetNextPlannedOp (edge.Step , edge.TxId ))
2019+ return TRowVersion (nextOp->GetStep (), nextOp->GetTxId ());
2020+
2021+ // Normally we stick transactions to the end of the last known mediator step
2022+ // Note this calculations only happen when we don't have distributed
2023+ // transactions left in queue, and we won't have any more transactions
2024+ // up to the current mediator time. The mediator time itself may be stale,
2025+ // in which case we may have evidence of its higher value via complete and
2026+ // incomplete edges above.
2027+ const ui64 mediatorStep = Max (MediatorTimeCastEntry ? MediatorTimeCastEntry->Get (TabletID ()) : 0 , writeEdge.Step );
2028+ TRowVersion mediatorEdge (mediatorStep, ::Max<ui64>());
2029+
2030+ switch (mode) {
2031+ case EMvccTxMode::ReadOnly: {
2032+ // We read at the end of the current step
2033+ return mediatorEdge;
2034+ }
2035+
2036+ case EMvccTxMode::ReadWrite: {
2037+ // We write at the end of the current step, or the start of the next step when that's protected
2038+ return Max (mediatorEdge, writeEdge.Next ());
2039+ }
2040+ }
19972041
1998- // Normally we stick transactions to the end of the last known mediator step
1999- // Note this calculations only happen when we don't have distributed
2000- // transactions left in queue, and we won't have any more transactions
2001- // up to the current mediator time. The mediator time itself may be stale,
2002- // in which case we may have evidence of its higher value via complete and
2003- // incomplete edges above.
2004- const ui64 mediatorStep = Max (MediatorTimeCastEntry ? MediatorTimeCastEntry->Get (TabletID ()) : 0 , writeEdge.Step );
2005- TRowVersion mediatorEdge (mediatorStep, ::Max<ui64>());
2042+ Y_ABORT (" unreachable" );
2043+ }();
20062044
20072045 switch (mode) {
20082046 case EMvccTxMode::ReadOnly: {
2009- // We want to include everything that was potentially confirmed to
2010- // users, but we don't want to include anything that is not replied
2011- // at the start of this read.
2012- // Note it's only possible to have ImmediateWriteEdge > mediatorEdge
2013- // when ImmediateWriteEdge == mediatorEdge + 1
2014- return Max (mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied ());
2047+ // We must read all writes we have replied to already
2048+ return Max (version, SnapshotManager.GetImmediateWriteEdgeReplied ());
20152049 }
20162050
20172051 case EMvccTxMode::ReadWrite: {
2018- // We must use at least a previously used immediate write edge
2019- // But we must also avoid trumpling over any unprotected mvcc
2020- // snapshot reads that have occurred.
2021- // Note it's only possible to go past the last known mediator step
2022- // when we had an unprotected read, which itself happens at the
2023- // last mediator step. So we may only ever have a +1 step, never
2024- // anything more.
2025- return Max (mediatorEdge, writeEdge.Next (), SnapshotManager.GetImmediateWriteEdge ());
2052+ // We must never go backwards in our single-shard writes
2053+ return Max (version, SnapshotManager.GetImmediateWriteEdge ());
20262054 }
20272055 }
20282056
@@ -2075,6 +2103,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
20752103 // We need to wait for completion until the flag is committed
20762104 res.WaitCompletion = true ;
20772105 }
2106+ LOG_TRACE_S (*TlsActivationContext, NKikimrServices::TX_DATASHARD, " PromoteImmediatePostExecuteEdges at " << TabletID ()
2107+ << " promoting UnprotectedReadEdge to " << version);
20782108 SnapshotManager.PromoteUnprotectedReadEdge (version);
20792109
20802110 // We want to promote the complete edge when protected reads are
@@ -2237,6 +2267,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
22372267 for (auto it = MediatorDelayedReplies.begin (); it != MediatorDelayedReplies.end ();) {
22382268 const ui64 step = it->first .Step ;
22392269
2270+ if (SrcSplitDescription) {
2271+ if (State == TShardState::SplitSrcSendingSnapshot ||
2272+ State == TShardState::SplitSrcWaitForPartitioningChanged ||
2273+ State == TShardState::PreOffline ||
2274+ State == TShardState::Offline)
2275+ {
2276+ // We cannot send replies, since dst shard is now in charge
2277+ // of keeping track of acknowledged writes. So we expect
2278+ // split src logic to reboot this shard later.
2279+ break ;
2280+ }
2281+ }
2282+
22402283 if (step <= mediatorStep) {
22412284 SnapshotManager.PromoteImmediateWriteEdgeReplied (it->first );
22422285 Send (it->second .Target , it->second .Event .Release (), 0 , it->second .Cookie );
@@ -2304,13 +2347,16 @@ void TDataShard::CheckMediatorStateRestored() {
23042347 // HEAD reads must include that in their results.
23052348 const ui64 waitStep = CoordinatorPrevReadStepMax;
23062349 const ui64 readStep = CoordinatorPrevReadStepMax;
2307-
2308- LOG_DEBUG_S (*TlsActivationContext, NKikimrServices::TX_DATASHARD, " CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
2350+ const ui64 observedStep = GetMaxObservedStep ();
2351+ LOG_DEBUG_S (*TlsActivationContext, NKikimrServices::TX_DATASHARD, " CheckMediatorStateRestored at " << TabletID () << " :"
2352+ << " waitStep# " << waitStep
2353+ << " readStep# " << readStep
2354+ << " observedStep# " << observedStep);
23092355
23102356 // WARNING: we must perform this check BEFORE we update unprotected read edge
23112357 // We may enter this code path multiple times, and we expect that the above
23122358 // read step may be refined while we wait based on pessimistic backup step.
2313- if (GetMaxObservedStep () < waitStep) {
2359+ if (observedStep < waitStep) {
23142360 // We need to wait until we observe mediator step that is at least
23152361 // as large as the step we found.
23162362 if (MediatorTimeCastWaitingSteps.insert (waitStep).second ) {
@@ -2331,7 +2377,10 @@ void TDataShard::CheckMediatorStateRestored() {
23312377 SnapshotManager.GetImmediateWriteEdge ().Step > SnapshotManager.GetCompleteEdge ().Step
23322378 ? SnapshotManager.GetImmediateWriteEdge ().Prev ()
23332379 : TRowVersion::Min ();
2334- SnapshotManager.PromoteUnprotectedReadEdge (Max (lastReadEdge, preImmediateWriteEdge));
2380+ const TRowVersion edge = Max (lastReadEdge, preImmediateWriteEdge);
2381+ LOG_TRACE_S (*TlsActivationContext, NKikimrServices::TX_DATASHARD, " CheckMediatorStateRestored at " << TabletID ()
2382+ << " promoting UnprotectedReadEdge to " << edge);
2383+ SnapshotManager.PromoteUnprotectedReadEdge (edge);
23352384 }
23362385
23372386 // Promote the replied immediate write edge up to the currently observed step
@@ -2340,7 +2389,7 @@ void TDataShard::CheckMediatorStateRestored() {
23402389 // data that is definitely not replied yet.
23412390 if (SnapshotManager.GetImmediateWriteEdgeReplied () < SnapshotManager.GetImmediateWriteEdge ()) {
23422391 const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge ().Step ;
2343- const TRowVersion edge (GetMaxObservedStep () , Max<ui64>());
2392+ const TRowVersion edge (observedStep , Max<ui64>());
23442393 SnapshotManager.PromoteImmediateWriteEdgeReplied (
23452394 Min (edge, SnapshotManager.GetImmediateWriteEdge ()));
23462395 // Try to ensure writes become visible sooner rather than later
@@ -2477,6 +2526,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
24772526 rejectDescriptions.push_back (TStringBuilder ()
24782527 << " is in process of split opId " << DstSplitOpId
24792528 << " state " << DatashardStateName (State));
2529+ } else if (State == TShardState::WaitScheme) {
2530+ reject = true ;
2531+ rejectReasons |= ERejectReasons::WrongState;
2532+ rejectDescriptions.push_back (" is not created yet" );
24802533 } else if (State == TShardState::PreOffline || State == TShardState::Offline) {
24812534 reject = true ;
24822535 rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
@@ -2639,6 +2692,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
26392692 auto * msg = ev->Get ();
26402693 LWTRACK (ProposeTransactionRequest, msg->Orbit );
26412694
2695+ if (CheckDataTxRejectAndReply (ev, ctx)) {
2696+ IncCounter (COUNTER_PREPARE_REQUEST);
2697+ return ;
2698+ }
2699+
26422700 // Check if we need to delay an immediate transaction
26432701 if (MediatorStateWaiting &&
26442702 (ev->Get ()->GetFlags () & TTxFlags::Immediate) &&
@@ -2671,10 +2729,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
26712729
26722730 IncCounter (COUNTER_PREPARE_REQUEST);
26732731
2674- if (CheckDataTxRejectAndReply (ev, ctx)) {
2675- return ;
2676- }
2677-
26782732 switch (ev->Get ()->GetTxKind ()) {
26792733 case NKikimrTxDataShard::TX_KIND_DATA:
26802734 case NKikimrTxDataShard::TX_KIND_SCAN:
0 commit comments