Skip to content

Commit 56c87b8

Browse files
authored
Merge 2ad2c3d into 72d2d86
2 parents 72d2d86 + 2ad2c3d commit 56c87b8

File tree

6 files changed

+134
-29
lines changed

6 files changed

+134
-29
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -226,41 +226,18 @@ void TDataShard::OnStopGuardStarting(const TActorContext &ctx) {
226226
// Handle immediate ops that have completed BuildAndWaitDependencies
227227
for (const auto &kv : Pipeline.GetImmediateOps()) {
228228
const auto &op = kv.second;
229-
// Send reject result immediately, because we cannot control when
230-
// a new datashard tablet may start and block us from commiting
231-
// anything new. The usual progress queue is too slow for that.
232-
if (!op->Result() && !op->HasResultSentFlag()) {
233-
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(op->GetKind());
234-
auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
235-
TString rejectReason = TStringBuilder()
236-
<< "Rejecting immediate tx "
237-
<< op->GetTxId()
238-
<< " because datashard "
239-
<< TabletID()
240-
<< " is restarting";
241-
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
242-
kind, TabletID(), op->GetTxId(), rejectStatus);
243-
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
244-
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);
245-
246-
ctx.Send(op->GetTarget(), result.Release(), 0, op->GetCookie());
247-
248-
IncCounter(COUNTER_PREPARE_OVERLOADED);
249-
IncCounter(COUNTER_PREPARE_COMPLETE);
250-
op->SetResultSentFlag();
229+
if (op->OnStopping(*this, ctx)) {
230+
Pipeline.AddCandidateOp(op);
231+
PlanQueue.Progress(ctx);
251232
}
252-
// Add op to candidates because IsReadyToExecute just became true
253-
Pipeline.AddCandidateOp(op);
254-
PlanQueue.Progress(ctx);
255233
}
256234

257235
// Handle prepared ops by notifying about imminent shutdown
258236
for (const auto &kv : TransQueue.GetTxsInFly()) {
259237
const auto &op = kv.second;
260-
if (op->GetTarget() && !op->HasCompletedFlag()) {
261-
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
262-
TabletID(), op->GetTxId());
263-
ctx.Send(op->GetTarget(), notify.Release(), 0, op->GetCookie());
238+
if (op->OnStopping(*this, ctx)) {
239+
Pipeline.AddCandidateOp(op);
240+
PlanQueue.Progress(ctx);
264241
}
265242
}
266243
}

ydb/core/tx/datashard/datashard_active_transaction.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,4 +944,45 @@ void TActiveTransaction::UntrackMemory() const {
944944
NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(TxBody.size());
945945
}
946946

947+
bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx) {
948+
if (IsImmediate()) {
949+
// Send reject result immediately, because we cannot control when
950+
// a new datashard tablet may start and block us from commiting
951+
// anything new. The usual progress queue is too slow for that.
952+
if (!HasResultSentFlag() && !Result()) {
953+
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind());
954+
auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
955+
TString rejectReason = TStringBuilder()
956+
<< "Rejecting immediate tx "
957+
<< GetTxId()
958+
<< " because datashard "
959+
<< self.TabletID()
960+
<< " is restarting";
961+
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
962+
kind, self.TabletID(), GetTxId(), rejectStatus);
963+
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
964+
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);
965+
966+
ctx.Send(GetTarget(), result.Release(), 0, GetCookie());
967+
968+
self.IncCounter(COUNTER_PREPARE_OVERLOADED);
969+
self.IncCounter(COUNTER_PREPARE_COMPLETE);
970+
SetResultSentFlag();
971+
}
972+
973+
// Immediate ops become ready when stopping flag is set
974+
return true;
975+
} else {
976+
// Distributed operations send notification when proposed
977+
if (GetTarget() && !HasCompletedFlag()) {
978+
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
979+
self.TabletID(), GetTxId());
980+
ctx.Send(GetTarget(), notify.Release(), 0, GetCookie());
981+
}
982+
983+
// Distributed ops avoid doing new work when stopping
984+
return false;
985+
}
986+
}
987+
947988
}}

ydb/core/tx/datashard/datashard_active_transaction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,8 @@ class TActiveTransaction : public TOperation {
603603
return ++PageFaultCount;
604604
}
605605

606+
bool OnStopping(TDataShard& self, const TActorContext& ctx) override;
607+
606608
private:
607609
void TrackMemory() const;
608610
void UntrackMemory() const;

ydb/core/tx/datashard/datashard_ut_order.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3386,6 +3386,72 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
33863386
}
33873387
}
33883388

3389+
Y_UNIT_TEST(TestShardRestartDuringWaitingRead) {
3390+
TPortManager pm;
3391+
NKikimrConfig::TAppConfig app;
3392+
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
3393+
TServerSettings serverSettings(pm.GetPort(2134));
3394+
serverSettings.SetDomainName("Root")
3395+
.SetUseRealThreads(false)
3396+
.SetAppConfig(app)
3397+
// We read from an unresolved volatile tx
3398+
.SetEnableDataShardVolatileTransactions(true);
3399+
3400+
Tests::TServer::TPtr server = new TServer(serverSettings);
3401+
auto &runtime = *server->GetRuntime();
3402+
auto sender = runtime.AllocateEdgeActor();
3403+
3404+
InitRoot(server, sender);
3405+
3406+
CreateShardedTable(server, sender, "/Root", "table-1", 1);
3407+
CreateShardedTable(server, sender, "/Root", "table-2", 1);
3408+
auto table1shards = GetTableShards(server, sender, "/Root/table-1");
3409+
3410+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10)"));
3411+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20)"));
3412+
3413+
// Block readset exchange
3414+
std::vector<std::unique_ptr<IEventHandle>> readSets;
3415+
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
3416+
readSets.emplace_back(ev.Release());
3417+
});
3418+
3419+
// Start a distributed write to both tables
3420+
TString sessionId = CreateSessionRPC(runtime, "/Root");
3421+
auto upsertResult = SendRequest(
3422+
runtime,
3423+
MakeSimpleRequestRPC(R"(
3424+
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
3425+
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
3426+
)", sessionId, /* txId */ "", /* commitTx */ true),
3427+
"/Root");
3428+
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");
3429+
3430+
// Start reading the first table
3431+
TString readSessionId = CreateSessionRPC(runtime, "/Root");
3432+
auto readResult = SendRequest(
3433+
runtime,
3434+
MakeSimpleRequestRPC(R"(
3435+
SELECT key, value FROM `/Root/table-1`
3436+
ORDER BY key;
3437+
)", readSessionId, /* txId */ "", /* commitTx */ true),
3438+
"/Root");
3439+
3440+
// Sleep to ensure read is properly waiting
3441+
runtime.SimulateSleep(TDuration::Seconds(1));
3442+
3443+
// Gracefully restart the first table shard
3444+
blockReadSets.Remove();
3445+
GracefulRestartTablet(runtime, table1shards[0], sender);
3446+
3447+
// Read succeeds because it is automatically retried
3448+
// No assert should be triggered in debug builds
3449+
UNIT_ASSERT_VALUES_EQUAL(
3450+
FormatResult(AwaitResponse(runtime, std::move(readResult))),
3451+
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
3452+
"{ items { uint32_value: 3 } items { uint32_value: 30 } }");
3453+
}
3454+
33893455
Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
33903456
TPortManager pm;
33913457
TServerSettings serverSettings(pm.GetPort(2134));

ydb/core/tx/datashard/operation.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,5 +289,12 @@ void TOperation::SetFinishProposeTs() noexcept
289289
SetFinishProposeTs(AppData()->MonotonicTimeProvider->Now());
290290
}
291291

292+
bool TOperation::OnStopping(TDataShard&, const TActorContext&)
293+
{
294+
// By default operations don't do anything when stopping
295+
// However they may become ready so add to candidates
296+
return true;
297+
}
298+
292299
} // namespace NDataShard
293300
} // namespace NKikimr

ydb/core/tx/datashard/operation.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,18 @@ class TOperation
812812
return OperationSpan.GetTraceId();
813813
}
814814

815+
/**
816+
* Called when datashard is going to stop soon
817+
*
818+
* Operation may override this method to support sending notifications or
819+
* results signalling that the operation will never complete. When result
820+
* is sent operation is supposed to set its ResultSentFlag.
821+
*
822+
* When this method returns true the operation will be added to the
823+
* pipeline as a candidate for execution.
824+
*/
825+
virtual bool OnStopping(TDataShard& self, const TActorContext& ctx);
826+
815827
protected:
816828
TOperation()
817829
: TOperation(TBasicOpInfo())

0 commit comments

Comments
 (0)