Skip to content

Make stopping result/notification sending dependent on operation type… #1835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 6 additions & 29 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,41 +226,18 @@ void TDataShard::OnStopGuardStarting(const TActorContext &ctx) {
// Handle immediate ops that have completed BuildAndWaitDependencies
for (const auto &kv : Pipeline.GetImmediateOps()) {
const auto &op = kv.second;
// Send reject result immediately, because we cannot control when
// a new datashard tablet may start and block us from commiting
// anything new. The usual progress queue is too slow for that.
if (!op->Result() && !op->HasResultSentFlag()) {
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(op->GetKind());
auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
TString rejectReason = TStringBuilder()
<< "Rejecting immediate tx "
<< op->GetTxId()
<< " because datashard "
<< TabletID()
<< " is restarting";
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
kind, TabletID(), op->GetTxId(), rejectStatus);
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);

ctx.Send(op->GetTarget(), result.Release(), 0, op->GetCookie());

IncCounter(COUNTER_PREPARE_OVERLOADED);
IncCounter(COUNTER_PREPARE_COMPLETE);
op->SetResultSentFlag();
if (op->OnStopping(*this, ctx)) {
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}
// Add op to candidates because IsReadyToExecute just became true
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}

// Handle prepared ops by notifying about imminent shutdown
for (const auto &kv : TransQueue.GetTxsInFly()) {
const auto &op = kv.second;
if (op->GetTarget() && !op->HasCompletedFlag()) {
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
TabletID(), op->GetTxId());
ctx.Send(op->GetTarget(), notify.Release(), 0, op->GetCookie());
if (op->OnStopping(*this, ctx)) {
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -937,4 +937,45 @@ void TActiveTransaction::UntrackMemory() const {
NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(TxBody.size());
}

bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx) {
if (IsImmediate()) {
// Send reject result immediately, because we cannot control when
// a new datashard tablet may start and block us from commiting
// anything new. The usual progress queue is too slow for that.
if (!HasResultSentFlag() && !Result()) {
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind());
auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
TString rejectReason = TStringBuilder()
<< "Rejecting immediate tx "
<< GetTxId()
<< " because datashard "
<< self.TabletID()
<< " is restarting";
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
kind, self.TabletID(), GetTxId(), rejectStatus);
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);

ctx.Send(GetTarget(), result.Release(), 0, GetCookie());

self.IncCounter(COUNTER_PREPARE_OVERLOADED);
self.IncCounter(COUNTER_PREPARE_COMPLETE);
SetResultSentFlag();
}

// Immediate ops become ready when stopping flag is set
return true;
} else {
// Distributed operations send notification when proposed
if (GetTarget() && !HasCompletedFlag()) {
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
self.TabletID(), GetTxId());
ctx.Send(GetTarget(), notify.Release(), 0, GetCookie());
}

// Distributed ops avoid doing new work when stopping
return false;
}
}

}}
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class TActiveTransaction : public TOperation {
return ++PageFaultCount;
}

bool OnStopping(TDataShard& self, const TActorContext& ctx) override;

private:
void TrackMemory() const;
void UntrackMemory() const;
Expand Down
66 changes: 66 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,72 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
}
}

Y_UNIT_TEST(TestShardRestartDuringWaitingRead) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetAppConfig(app)
// We read from an unresolved volatile tx
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
auto table1shards = GetTableShards(server, sender, "/Root/table-1");

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10)"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20)"));

// Block readset exchange
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start a distributed write to both tables
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto upsertResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
)", sessionId, /* txId */ "", /* commitTx */ true),
"/Root");
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");

// Start reading the first table
TString readSessionId = CreateSessionRPC(runtime, "/Root");
auto readResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
SELECT key, value FROM `/Root/table-1`
ORDER BY key;
)", readSessionId, /* txId */ "", /* commitTx */ true),
"/Root");

// Sleep to ensure read is properly waiting
runtime.SimulateSleep(TDuration::Seconds(1));

// Gracefully restart the first table shard
blockReadSets.Remove();
GracefulRestartTablet(runtime, table1shards[0], sender);

// Read succeeds because it is automatically retried
// No assert should be triggered in debug builds
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(readResult))),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }");
}

Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/datashard/operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,12 @@ void TOperation::SetFinishProposeTs() noexcept
SetFinishProposeTs(AppData()->MonotonicTimeProvider->Now());
}

bool TOperation::OnStopping(TDataShard&, const TActorContext&)
{
// By default operations don't do anything when stopping
// However they may become ready so add to candidates
return true;
}

} // namespace NDataShard
} // namespace NKikimr
12 changes: 12 additions & 0 deletions ydb/core/tx/datashard/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ class TOperation
return OperationSpan.GetTraceId();
}

/**
* Called when datashard is going to stop soon
*
* Operation may override this method to support sending notifications or
* results signalling that the operation will never complete. When result
* is sent operation is supposed to set its ResultSentFlag.
*
* When this method returns true the operation will be added to the
* pipeline as a candidate for execution.
*/
virtual bool OnStopping(TDataShard& self, const TActorContext& ctx);

protected:
TOperation()
: TOperation(TBasicOpInfo())
Expand Down