Skip to content

Don't ack readsets too early on volatile tx abort #14495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3672,6 +3672,161 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT(!readFuture.HasValue());
}

Y_UNIT_TEST(GracefulShardRestartNoEarlyReadSetAck) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

struct TBootInfo {
ui32 Generation;
TActorId Launcher;
};
THashMap<ui64, TBootInfo> bootInfo;
auto observeBootInfo = runtime.AddObserver<TEvTablet::TEvBoot>([&](auto& ev) {
auto* msg = ev->Get();
auto& info = bootInfo[msg->TabletID];
info.Generation = msg->Generation;
info.Launcher = msg->Launcher;
});

Cerr << "========= Creating table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10, 20));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 3u);
// const auto tableId = ResolveTableId(server, sender, "/Root/table");

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (1, 1), (11, 11), (21, 21);
)"),
"<empty>");

Cerr << "========= Starting a transaction =========" << Endl;
TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
SELECT key, value FROM `/Root/table` ORDER BY key;
)"),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 11 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 21 } items { uint32_value: 21 } }");

Cerr << "========= Upserting a row to shard 2 to break the lock =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (11, 111);
)"),
"<empty>");

runtime.SimulateSleep(TDuration::MilliSeconds(1));

// Block channel 0 commits at shard 3
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
[shard3 = shards.at(2)](const auto& ev) {
auto* msg = ev->Get();
if (msg->Id.TabletID() == shard3 && msg->Id.Channel() == 0) {
return true;
}
return false;
});

// Block readsets at shard 3
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime,
[shard3actor = ResolveTablet(runtime, shards.at(2))](const auto& ev) {
return ev->GetRecipientRewrite() == shard3actor;
});

// Force shard 1 to be the arbiter
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));

Cerr << "========= Starting to commit =========" << Endl;
auto commitFuture = KqpSimpleSendCommit(runtime, sessionId, txId, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (2, 2), (12, 12), (22, 22);
)");

// After a short while shard 3 must have 2 blocked commits and 2 blocked readsets (expectation + abort):
// - Sys update on the new PlanStep
// - Execute and persist volatile tx
// - Processed abort tx persistence
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT_VALUES_EQUAL(blockedCommits.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(blockedReadSets.size(), 2u);

// Unblock readsets and wait for one more commit attempt (abort persistence)
blockedReadSets.Stop().Unblock();
runtime.WaitFor("1 more commit", [&]{ return blockedCommits.size() >= 3; });

// Block channel 0 commit responses at shard 3
TBlockEvents<TEvBlobStorage::TEvPutResult> blockedCommitResults(runtime,
[shard3 = shards.at(2)](const auto& ev) {
auto* msg = ev->Get();
if (msg->Id.TabletID() == shard3 && msg->Id.Channel() == 0) {
return true;
}
return false;
});

// Unblock the first two commits and wait for their blocked responses
blockedCommits.Unblock(2);
runtime.WaitFor("2 commit results", [&]{ return blockedCommitResults.size() >= 2; });
blockedCommits.Stop();
blockedCommitResults.Stop();

Cerr << "========= Starting new shard3 generation =========" << Endl;
UNIT_ASSERT(bootInfo.contains(shards.at(2)));
auto shard3sys = ResolveTablet(runtime, shards.at(2), 0, true);
auto shard3info = bootInfo.at(shards.at(2));
runtime.Send(
new IEventHandle(shard3info.Launcher, shard3sys,
new TEvTablet::TEvTabletDead(shards.at(2), TEvTablet::TEvTabletDead::ReasonPill, shard3info.Generation)),
0, true);
runtime.SimulateSleep(TDuration::MilliSeconds(1));
InvalidateTabletResolverCache(runtime, shards.at(2), 0);

Cerr << "========= Unblocking old shard3 generation =========" << Endl;
blockedCommitResults.Unblock();
blockedCommits.Unblock();

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(commitFuture))),
"ERROR: ABORTED");

Cerr << "========= Final read (must not hang) =========" << Endl;
auto readFuture = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table` ORDER BY key;
)");
runtime.SimulateSleep(TDuration::MilliSeconds(100));
UNIT_ASSERT_C(readFuture.HasValue(), "Read didn't finish in 100ms of simulated time");

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(readFuture.ExtractValueSync()),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 11 } items { uint32_value: 111 } }, "
"{ items { uint32_value: 21 } items { uint32_value: 21 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
11 changes: 6 additions & 5 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,10 +833,9 @@ namespace NKikimr::NDataShard {
return true;

case EVolatileTxState::Aborting:
// Aborting state will not change as long as we're still leader
return true;
// Ack readset normally as long as we're still a leader
return true;
// We need to wait until volatile tx abort is committed to send rs acks
info->DelayedAcks.push_back(std::move(ack));
return false;
}

ui64 srcTabletId = record.GetTabletSource();
Expand Down Expand Up @@ -891,8 +890,10 @@ namespace NKikimr::NDataShard {
}();

if (!committed) {
// We need to wait until volatile tx abort is committed to send rs acks
info->DelayedAcks.push_back(std::move(ack));
AbortWaitingTransaction(info);
return true;
return false;
}

NIceDb::TNiceDb db(txc.DB);
Expand Down
Loading