Skip to content

Commit ef48b00

Browse files
authored
24-3: Abort volatile transactions during graceful restarts (#12808)
1 parent 9ea2252 commit ef48b00

File tree

7 files changed

+291
-21
lines changed

7 files changed

+291
-21
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
539539
LOG_E("Shard " << tabletId << " transaction lost during reconnect: " << record.GetStatus());
540540

541541
CancelProposal(tabletId);
542-
ReplyTxStateUnknown(tabletId);
542+
ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << tabletId);
543543
}
544544

545545
void HandlePrepare(TEvDqCompute::TEvState::TPtr& ev) {
@@ -581,7 +581,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
581581
return ReplyUnavailable(TStringBuilder() << "Could not prepare program on shard " << msg->TabletId);
582582
}
583583

584-
return ReplyTxStateUnknown(msg->TabletId);
584+
return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId);
585585
}
586586

587587
case TShardState::EState::Prepared: {
@@ -601,7 +601,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
601601
<< (msg->NotDelivered ? ", last message not delivered" : ""));
602602

603603
CancelProposal(0);
604-
return ReplyTxStateUnknown(msg->TabletId);
604+
return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId);
605605
}
606606

607607
case TShardState::EState::Initial:

ydb/core/tx/datashard/datashard.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven
538538
delayedAcks.clear();
539539
}
540540

541-
void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
541+
void TDataShard::GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
542542
if (!op->HasOutputData()) {
543543
// There are no replies
544544
return;
@@ -562,6 +562,10 @@ void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::
562562
expectedReadSets.clear();
563563
}
564564

565+
void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
566+
GetCleanupReplies(op.Get(), cleanupReplies);
567+
}
568+
565569
void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) {
566570
if (replies.empty()) {
567571
return;

ydb/core/tx/datashard/datashard_active_transaction.cpp

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -911,12 +911,12 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx)
911911
<< " because datashard "
912912
<< self.TabletID()
913913
<< " is restarting";
914-
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
914+
auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(
915915
kind, self.TabletID(), GetTxId(), rejectStatus);
916916
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
917917
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);
918918

919-
ctx.Send(GetTarget(), result.Release(), 0, GetCookie());
919+
ctx.Send(GetTarget(), result.release(), 0, GetCookie());
920920

921921
self.IncCounter(COUNTER_PREPARE_OVERLOADED);
922922
self.IncCounter(COUNTER_PREPARE_COMPLETE);
@@ -925,6 +925,35 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx)
925925

926926
// Immediate ops become ready when stopping flag is set
927927
return true;
928+
} else if (HasVolatilePrepareFlag()) {
929+
// Volatile transactions may be aborted at any time unless executed
930+
// Note: we need to send the result (and discard the transaction) as
931+
// soon as possible, because new transactions are unlikely to execute
932+
// and commits will even more likely fail.
933+
if (!HasResultSentFlag() && !Result() && !HasCompletedFlag()) {
934+
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind());
935+
auto status = NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED;
936+
auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(
937+
kind, self.TabletID(), GetTxId(), status);
938+
result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, TStringBuilder()
939+
<< "DataShard " << self.TabletID() << " is restarting");
940+
ctx.Send(GetTarget(), result.release(), 0, GetCookie());
941+
942+
// Make sure we also send acks and nodata readsets to expecting participants
943+
std::vector<std::unique_ptr<IEventHandle>> cleanupReplies;
944+
self.GetCleanupReplies(this, cleanupReplies);
945+
946+
for (auto& ev : cleanupReplies) {
947+
TActivationContext::Send(ev.release());
948+
}
949+
950+
SetResultSentFlag();
951+
return true;
952+
}
953+
954+
// Executed transactions will have to wait until committed
955+
// There is no way to hand-off committing volatile transactions for now
956+
return false;
928957
} else {
929958
// Distributed operations send notification when proposed
930959
if (GetTarget() && !HasCompletedFlag()) {

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,7 @@ class TDataShard
14931493
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets);
14941494
void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
14951495
void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const;
1496+
void GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
14961497
void GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
14971498
void SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies);
14981499
void SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies);

ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -303,20 +303,26 @@ Y_UNIT_TEST(ProposeResultLost_RwTx) {
303303
TestProposeResultLost(*fixture.Runtime, fixture.Client,
304304
Q_(R"(
305305
upsert into `/Root/table-1` (key, value) VALUES
306-
(1, 1), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295)
306+
(1, 11), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295)
307307
)"),
308308
[](const NKikimrKqp::TEvQueryResponse& record) {
309-
UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNDETERMINED, record.DebugString());
310-
311-
TIssues issues;
312-
IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
313-
UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
314-
"State of operation is unknown."), record.GetResponse().DebugString());
315-
316-
UNIT_ASSERT_C(HasIssue(issues, NKikimrIssues::TIssuesIds::TX_STATE_UNKNOWN, "", [] (const TIssue& issue) {
317-
return issue.GetMessage().StartsWith("Tx state unknown for shard ");
318-
}), record.GetResponse().DebugString());
309+
UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE, record.DebugString());
310+
311+
TIssues issues;
312+
IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
313+
UNIT_ASSERT_C(
314+
HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
315+
"Kikimr cluster or one of its subsystems was unavailable."),
316+
record.GetResponse().DebugString());
319317
});
318+
319+
// Verify that the transaction didn't commit
320+
UNIT_ASSERT_VALUES_EQUAL(
321+
KqpSimpleExec(*fixture.Runtime,
322+
Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key")),
323+
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
324+
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
325+
"{ items { uint32_value: 3 } items { uint32_value: 3 } }");
320326
}
321327

322328
} // suite

ydb/core/tx/datashard/datashard_ut_volatile.cpp

Lines changed: 206 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3020,10 +3020,12 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
30203020

30213021
// We need to fill table with some data
30223022
Cerr << "========= Upserting initial values =========" << Endl;
3023-
KqpSimpleExec(runtime, R"(
3024-
UPSERT INTO `/Root/table` (key, subkey, value)
3025-
VALUES (1, 1), (11, 11)
3026-
)");
3023+
UNIT_ASSERT_VALUES_EQUAL(
3024+
KqpSimpleExec(runtime, R"(
3025+
UPSERT INTO `/Root/table` (key, value)
3026+
VALUES (1, 1), (11, 11)
3027+
)"),
3028+
"<empty>");
30273029

30283030
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
30293031
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
@@ -3080,6 +3082,206 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
30803082
Cerr << "... split finished" << Endl;
30813083
}
30823084

3085+
Y_UNIT_TEST(DistributedUpsertRestartBeforePrepare) {
3086+
TPortManager pm;
3087+
TServerSettings serverSettings(pm.GetPort(2134));
3088+
serverSettings.SetDomainName("Root")
3089+
.SetUseRealThreads(false)
3090+
.SetEnableDataShardVolatileTransactions(true);
3091+
3092+
Tests::TServer::TPtr server = new TServer(serverSettings);
3093+
auto &runtime = *server->GetRuntime();
3094+
auto sender = runtime.AllocateEdgeActor();
3095+
3096+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3097+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3098+
3099+
InitRoot(server, sender);
3100+
3101+
Cerr << "========= Creating the table =========" << Endl;
3102+
UNIT_ASSERT_VALUES_EQUAL(
3103+
KqpSchemeExec(runtime, R"(
3104+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3105+
WITH (PARTITION_AT_KEYS = (10));
3106+
)"),
3107+
"SUCCESS");
3108+
3109+
const auto shards = GetTableShards(server, sender, "/Root/table");
3110+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3111+
3112+
// We need to fill table with some data
3113+
Cerr << "========= Upserting initial values =========" << Endl;
3114+
UNIT_ASSERT_VALUES_EQUAL(
3115+
KqpSimpleExec(runtime, R"(
3116+
UPSERT INTO `/Root/table` (key, value)
3117+
VALUES (1, 1), (11, 11)
3118+
)"),
3119+
"<empty>");
3120+
3121+
TBlockEvents<TEvDataShard::TEvProposeTransaction> blockedPrepare(runtime);
3122+
3123+
Cerr << "========= Starting upsert 1 =========" << Endl;
3124+
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
3125+
UPSERT INTO `/Root/table` (key, value)
3126+
VALUES (2, 2), (12, 12);
3127+
)");
3128+
3129+
runtime.WaitFor("prepare requests", [&]{ return blockedPrepare.size() >= 2; });
3130+
UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u);
3131+
3132+
blockedPrepare.Stop();
3133+
3134+
Cerr << "========= Restarting shard 1 =========" << Endl;
3135+
GracefulRestartTablet(runtime, shards.at(0), sender);
3136+
3137+
UNIT_ASSERT_VALUES_EQUAL(
3138+
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
3139+
"ERROR: UNAVAILABLE");
3140+
}
3141+
3142+
Y_UNIT_TEST(DistributedUpsertRestartAfterPrepare) {
3143+
TPortManager pm;
3144+
TServerSettings serverSettings(pm.GetPort(2134));
3145+
serverSettings.SetDomainName("Root")
3146+
.SetUseRealThreads(false)
3147+
.SetEnableDataShardVolatileTransactions(true);
3148+
3149+
Tests::TServer::TPtr server = new TServer(serverSettings);
3150+
auto &runtime = *server->GetRuntime();
3151+
auto sender = runtime.AllocateEdgeActor();
3152+
3153+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3154+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3155+
3156+
InitRoot(server, sender);
3157+
3158+
Cerr << "========= Creating the table =========" << Endl;
3159+
UNIT_ASSERT_VALUES_EQUAL(
3160+
KqpSchemeExec(runtime, R"(
3161+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3162+
WITH (PARTITION_AT_KEYS = (10));
3163+
)"),
3164+
"SUCCESS");
3165+
3166+
const auto shards = GetTableShards(server, sender, "/Root/table");
3167+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3168+
3169+
// We need to fill table with some data
3170+
Cerr << "========= Upserting initial values =========" << Endl;
3171+
UNIT_ASSERT_VALUES_EQUAL(
3172+
KqpSimpleExec(runtime, R"(
3173+
UPSERT INTO `/Root/table` (key, value)
3174+
VALUES (1, 1), (11, 11)
3175+
)"),
3176+
"<empty>");
3177+
3178+
TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedPrepare(runtime);
3179+
3180+
Cerr << "========= Starting upsert 1 =========" << Endl;
3181+
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
3182+
UPSERT INTO `/Root/table` (key, value)
3183+
VALUES (2, 2), (12, 12);
3184+
)");
3185+
3186+
runtime.WaitFor("prepare results", [&]{ return blockedPrepare.size() >= 2; });
3187+
UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u);
3188+
3189+
for (auto& ev : blockedPrepare) {
3190+
auto* msg = ev->Get();
3191+
UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus(), NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED);
3192+
}
3193+
3194+
// Unblock prepare results and restart the first shard
3195+
blockedPrepare.Stop().Unblock();
3196+
3197+
Cerr << "========= Restarting shard 1 =========" << Endl;
3198+
GracefulRestartTablet(runtime, shards.at(0), sender);
3199+
3200+
UNIT_ASSERT_VALUES_EQUAL(
3201+
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
3202+
"ERROR: ABORTED");
3203+
}
3204+
3205+
Y_UNIT_TEST(DistributedUpsertRestartAfterPlan) {
3206+
TPortManager pm;
3207+
TServerSettings serverSettings(pm.GetPort(2134));
3208+
serverSettings.SetDomainName("Root")
3209+
.SetUseRealThreads(false)
3210+
.SetEnableDataShardVolatileTransactions(true);
3211+
3212+
Tests::TServer::TPtr server = new TServer(serverSettings);
3213+
auto &runtime = *server->GetRuntime();
3214+
auto sender = runtime.AllocateEdgeActor();
3215+
3216+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3217+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3218+
3219+
InitRoot(server, sender);
3220+
3221+
Cerr << "========= Creating the table =========" << Endl;
3222+
UNIT_ASSERT_VALUES_EQUAL(
3223+
KqpSchemeExec(runtime, R"(
3224+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3225+
WITH (PARTITION_AT_KEYS = (10));
3226+
)"),
3227+
"SUCCESS");
3228+
3229+
const auto shards = GetTableShards(server, sender, "/Root/table");
3230+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3231+
3232+
// We need to fill table with some data
3233+
Cerr << "========= Upserting initial values =========" << Endl;
3234+
UNIT_ASSERT_VALUES_EQUAL(
3235+
KqpSimpleExec(runtime, R"(
3236+
UPSERT INTO `/Root/table` (key, value)
3237+
VALUES (1, 1), (11, 11)
3238+
)"),
3239+
"<empty>");
3240+
3241+
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime);
3242+
3243+
Cerr << "========= Starting upsert 1 =========" << Endl;
3244+
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
3245+
UPSERT INTO `/Root/table` (key, value)
3246+
VALUES (2, 2), (12, 12);
3247+
)");
3248+
3249+
runtime.WaitFor("shard plans", [&]{ return blockedPlan.size() >= 2; });
3250+
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
3251+
3252+
// Block TEvPrivate::TEvProgressTransaction for shard1
3253+
auto shard1actor = ResolveTablet(runtime, shards.at(0));
3254+
TBlockEvents<IEventHandle> blockedProgress(runtime,
3255+
[&](const TAutoPtr<IEventHandle>& ev) {
3256+
return ev->GetRecipientRewrite() == shard1actor &&
3257+
ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0;
3258+
});
3259+
3260+
// Unblock prepare results and restart the first shard
3261+
blockedPlan.Stop().Unblock();
3262+
runtime.WaitFor("blocked progress", [&]{ return blockedProgress.size() >= 1; });
3263+
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1u);
3264+
3265+
Cerr << "... sleeping for 1 second" << Endl;
3266+
runtime.SimulateSleep(TDuration::Seconds(1));
3267+
3268+
Cerr << "========= Restarting shard 1 =========" << Endl;
3269+
GracefulRestartTablet(runtime, shards.at(0), sender);
3270+
3271+
UNIT_ASSERT_VALUES_EQUAL(
3272+
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
3273+
"ERROR: ABORTED");
3274+
3275+
Cerr << "========= Checking table =========" << Endl;
3276+
UNIT_ASSERT_VALUES_EQUAL(
3277+
KqpSimpleExec(runtime, R"(
3278+
SELECT key, value FROM `/Root/table`
3279+
ORDER BY key;
3280+
)"),
3281+
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
3282+
"{ items { uint32_value: 11 } items { uint32_value: 11 } }");
3283+
}
3284+
30833285
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
30843286

30853287
} // namespace NKikimr

0 commit comments

Comments
 (0)