Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ void TTxCoordinator::Handle(TEvTxProxy::TEvAcquireReadStep::TPtr& ev, const TAct
return;
}

if (ReadOnlyLeaseEnabled()) {
// Note: when volatile state is preserved we don't want to update the last
// acquired step, because the new generation might miss that and invariants
// not read-step not going back would be violated. Run the code below using
// the normal tx, which will almost certainly fail (the storage is supposed
// to be blocked already), or successfully persist the new read step.
if (ReadOnlyLeaseEnabled() && !VolatileState.Preserved) {
// We acquire read step using a read-only lease from executor
// It is guaranteed that any future generation was not running at
// the time ConfirmReadOnlyLease was called.
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/tx/coordinator/coordinator__plan_step.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "coordinator_impl.h"
#include "coordinator_hooks.h"

#include <util/generic/hash_set.h>

Expand Down Expand Up @@ -42,7 +43,29 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
}

void Plan(TTransactionContext &txc, const TActorContext &ctx) {
Y_UNUSED(txc);
if (Self->VolatileState.Preserved) {
// A preserved state indicates a newer generation has been started
// already, and this coordinator will stop eventually. Decline
// all pending transactions.
for (auto& slot : Slots) {
for (auto& proposal : slot) {
Self->MonCounters.StepPlannedDeclinedTx->Inc();
ProxyPlanConfirmations.Queue.emplace_back(
proposal.TxId,
proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting,
0);
++DeclinedCounter;
}
}
Self->SendStepConfirmations(ProxyPlanConfirmations, ctx);
return;
}

if (auto* hooks = ICoordinatorHooks::Get(); Y_UNLIKELY(hooks)) {
hooks->BeginPlanStep(Self->TabletID(), Self->Executor()->Generation(), PlanOnStep);
}

NIceDb::TNiceDb db(txc.DB);
ExecStartMoment = ctx.Now();
const bool lowDiskSpace = Self->Executor()->GetStats().IsAnyChannelYellowStop;
Expand Down
20 changes: 16 additions & 4 deletions ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
return true;
}

void RestoreVolatileSteps() {
TStepId RestoreVolatileSteps() {
TStepId maxStep = 0;
for (auto &pr : Self->VolatileTransactions) {
auto txId = pr.first;
auto &tx = pr.second;
maxStep = Max(maxStep, tx.PlanOnStep);
for (auto &prmed : tx.UnconfirmedAffectedSet) {
auto medId = prmed.first;
auto &medTx = GetMediatorTx(medId, tx.PlanOnStep, txId);
Expand All @@ -137,6 +139,7 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
}
}
}
return maxStep;
}

TTxType GetTxType() const override { return TXTYPE_INIT; }
Expand All @@ -146,15 +149,24 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
bool result = Restore(transactions, txc, ctx);
if (!result)
return false;
RestoreVolatileSteps();
TStepId maxVolatileStep = RestoreVolatileSteps();
i64 txCounter = transactions.size() + Self->VolatileTransactions.size();
Self->Transactions.swap(transactions);
*Self->MonCounters.TxInFly += txCounter;
Self->MonCounters.CurrentTxInFly = txCounter;

if (Self->PrevStateActorId) {
NIceDb::TNiceDb db(txc.DB);
NIceDb::TNiceDb db(txc.DB);

// Previous coordinator might have had transactions that were after
// its persistent blocked range, but before LastPlanned was updated.
// Since we pick them up as planned and send to mediators we also need
// to make sure LastPlanned reflects that.
if (Self->VolatileState.LastPlanned < maxVolatileStep) {
Self->VolatileState.LastPlanned = maxVolatileStep;
Schema::SaveState(db, Schema::State::KeyLastPlanned, maxVolatileStep);
}

if (Self->PrevStateActorId) {
ui64 volatileLeaseMs = Self->VolatilePlanLeaseMs;
if (volatileLeaseMs > 0) {
// Make sure we start and persist new state actor before allowing clients to acquire new read steps
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/coordinator/coordinator_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ namespace NKikimr::NFlatTxCoordinator {
return true;
}

void ICoordinatorHooks::BeginPlanStep(ui64 tabletId, ui64 generation, ui64 planStep) {
Y_UNUSED(tabletId);
Y_UNUSED(generation);
Y_UNUSED(planStep);
}

ICoordinatorHooks* ICoordinatorHooks::Get() {
return CoordinatorHooks.load(std::memory_order_acquire);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/coordinator/coordinator_hooks.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace NKikimr::NFlatTxCoordinator {

public:
virtual bool PersistConfig(ui64 tabletId, const NKikimrSubDomains::TProcessingParams& config);
virtual void BeginPlanStep(ui64 tabletId, ui64 generation, ui64 planStep);

public:
static ICoordinatorHooks* Get();
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/coordinator/coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ ui64 TTxCoordinator::AlignPlanStep(ui64 step) {
void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) {
//LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE EvPlanTick LastPlanned " << VolatileState.LastPlanned);

if (VolatileState.Preserved) {
// Avoid planning any new transactions, wait until we are stopped
return;
}

ui64 next = ev->Get()->Step;
while (!PendingPlanTicks.empty() && PendingPlanTicks.front() <= next) {
PendingPlanTicks.pop_front();
Expand Down Expand Up @@ -556,8 +561,14 @@ void TTxCoordinator::TryInitMonCounters(const TActorContext &ctx) {
}

void TTxCoordinator::SendMediatorStep(TMediator &mediator, const TActorContext &ctx) {
if (VolatileState.Preserved) {
// We don't want to send new steps when state has been preserved and
// potentially sent to newer generations.
return;
}

if (!mediator.Active) {
// We don't want to update LastSentStep when mediators are not empty
// We don't want to update LastSentStep when mediators are not connected
return;
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/coordinator/coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
TVector<TAcquireReadStepRequest> AcquireReadStepPending;
bool AcquireReadStepFlushing = false;
bool AcquireReadStepStarting = false;

// When true the state has been preserved by the state actor
// Any changes will not be migrated to newer generations
bool Preserved = false;
};

public:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/coordinator/coordinator_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void TCoordinatorStateActor::PreserveState() {
Y_ABORT_UNLESS(ok);
}

Owner->VolatileState.Preserved = true;
}

STFUNC(TCoordinatorStateActor::StateWork) {
Expand Down
Loading