Skip to content

Make sure out-of-order volatile commits are not visible on followers … #1344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/tablet/tablet_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
if (!(entry.KnownLeaderTablet == msg->CurrentLeaderTablet || !entry.KnownLeaderTablet)) {
DropEntry(tabletId, entry, ctx); // got info but not full, occurs on transitional cluster states
} else {
entry.KnownLeaderTablet = msg->CurrentLeaderTablet;
entry.State = TEntry::StProblemPing;
entry.KnownLeaderTablet = msg->CurrentLeaderTablet;
entry.KnownFollowers = std::move(msg->Followers);
SendPing(tabletId, entry, ctx);
}
} else {
Expand Down
121 changes: 121 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,127 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT(splitLatency < TDuration::Seconds(5));
}

Y_UNIT_TEST(DistributedOutOfOrderFollowerConsistency) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetNodeCount(1)
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::TABLET_RESOLVER, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::STATESTORAGE, NLog::PRI_TRACE);

InitRoot(server, sender);

auto opts = TShardedTableOptions()
.Shards(1)
.Followers(1);
CreateShardedTable(server, sender, "/Root", "table-1", opts);
CreateShardedTable(server, sender, "/Root", "table-2", opts);

runtime.SimulateSleep(TDuration::Seconds(1));
for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) {
InvalidateTabletResolverCache(runtime, shard);
}
for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) {
InvalidateTabletResolverCache(runtime, shard);
}

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2);");

// Let followers catch up
runtime.SimulateSleep(TDuration::Seconds(1));

// Block readset exchange
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start a distributed write to both tables
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto upsertResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4);
)", sessionId, /* txId */ "", /* commitTx */ true),
"/Root");
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");

// Stop blocking further readsets
blockReadSets.Remove();

// Start another distributed write to both tables, it should succeed
ExecSQL(server, sender, R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5);
UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 6);
)");

// Let followers catch up
runtime.SimulateSleep(TDuration::Seconds(1));
for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) {
InvalidateTabletResolverCache(runtime, shard);
}
for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) {
InvalidateTabletResolverCache(runtime, shard);
}

// Check tables, they shouldn't see inconsistent results with the latest write
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-1`
ORDER BY key
)"), "/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-2`
ORDER BY key
)"), "/Root"),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }");

// Unblock readsets
for (auto& ev : readSets) {
ui32 nodeIndex = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0);
runtime.Send(ev.release(), nodeIndex, true);
}
readSets.clear();

// Let followers catch up
runtime.SimulateSleep(TDuration::Seconds(1));

// Check tables again, they should have all rows visible now
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-1`
ORDER BY key
)")),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 5 } }");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-2`
ORDER BY key
)")),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 6 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
13 changes: 12 additions & 1 deletion ydb/core/tx/datashard/follower_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,24 @@ std::tuple<TRowVersion, bool, ui64> TDataShard::CalculateFollowerReadEdge() cons
Y_ABORT_UNLESS(!IsFollower());
Y_DEBUG_ABORT_UNLESS(IsMvccEnabled());

TRowVersion volatileUncertain = VolatileTxManager.GetMinUncertainVersion();

for (auto order : TransQueue.GetPlan()) {
// When we have planned operations we assume the first one may be used
// for new writes, so we mark is as non-repeatable. We could skip
// readonly operations, but there's little benefit in that, and it's
// complicated to determine which is the first readable given we may
// have executed some out of order.
return { TRowVersion(order.Step, order.TxId), false, 0 };
return { Min(volatileUncertain, TRowVersion(order.Step, order.TxId)), false, 0 };
}

if (!volatileUncertain.IsMax()) {
// We have some uncertainty in an unresolved volatile commit
// Allow followers to read from it in non-repeatable snapshot modes
// FIXME: when at least one write is committed at this version, it
// should stop being non-repeatable, and followers need to resolve
// other possibly out-of-order commits.
return { volatileUncertain, false, 0 };
}

// This is the max version where we had any writes
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,18 @@ namespace NKikimr::NDataShard {

UnblockWaitingRemovalOperations(info);

TRowVersion prevUncertain = GetMinUncertainVersion();

for (ui64 commitTxId : info->CommitTxIds) {
VolatileTxByCommitTxId.erase(commitTxId);
}
VolatileTxByVersion.erase(info);
VolatileTxs.erase(txId);

if (prevUncertain < GetMinUncertainVersion()) {
Self->PromoteFollowerReadEdge();
}

if (!WaitingSnapshotEvents.empty()) {
TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr;
while (!WaitingSnapshotEvents.empty()) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ namespace NKikimr::NDataShard {
return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot;
}

TRowVersion GetMinUncertainVersion() const {
if (!VolatileTxByVersion.empty()) {
return (*VolatileTxByVersion.begin())->Version;
} else {
return TRowVersion::Max();
}
}

void PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,
Expand Down