Skip to content

Commit 7b61b05

Browse files
committed
Fix planning of volatile transactions with uncommitted effects (#2505)
1 parent f24b02a commit 7b61b05

File tree

5 files changed

+190
-87
lines changed

5 files changed

+190
-87
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 60 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2006,23 +2006,68 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20062006
TTopicTabletTxs& topicTxs) {
20072007
TDatashardTxs datashardTxs;
20082008

2009-
std::vector<ui64> affectedShardsSet;
2010-
affectedShardsSet.reserve(datashardTasks.size());
2011-
20122009
for (auto& [shardId, tasks]: datashardTasks) {
20132010
auto [it, success] = datashardTxs.emplace(
20142011
shardId,
20152012
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());
20162013

20172014
YQL_ENSURE(success, "unexpected duplicates in datashard transactions");
2018-
affectedShardsSet.emplace_back(shardId);
20192015
NKikimrTxDataShard::TKqpTransaction* dsTxs = it->second;
20202016
dsTxs->MutableTasks()->Reserve(tasks.size());
20212017
for (auto& task: tasks) {
20222018
dsTxs->AddTasks()->Swap(task);
20232019
}
20242020
}
20252021

2022+
// Note: when locks map is present it will be mutated to avoid copying data
2023+
auto& locksMap = Request.DataShardLocks;
2024+
if (!locksMap.empty()) {
2025+
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback);
2026+
}
2027+
2028+
// Materialize (possibly empty) txs for all shards with locks (either commit or rollback)
2029+
for (auto& [shardId, locksList] : locksMap) {
2030+
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");
2031+
2032+
auto it = datashardTxs.find(shardId);
2033+
if (it == datashardTxs.end()) {
2034+
auto [emplaced, success] = datashardTxs.emplace(
2035+
shardId,
2036+
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());
2037+
2038+
YQL_ENSURE(success, "unexpected failure to emplace a datashard transaction");
2039+
it = emplaced;
2040+
}
2041+
2042+
NKikimrTxDataShard::TKqpTransaction* tx = it->second;
2043+
switch (Request.LocksOp) {
2044+
case ELocksOp::Commit:
2045+
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
2046+
break;
2047+
case ELocksOp::Rollback:
2048+
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback);
2049+
break;
2050+
case ELocksOp::Unspecified:
2051+
break;
2052+
}
2053+
2054+
// Move lock descriptions to the datashard tx
2055+
auto* protoLocks = tx->MutableLocks()->MutableLocks();
2056+
protoLocks->Reserve(locksList.size());
2057+
bool hasWrites = false;
2058+
for (auto& lock : locksList) {
2059+
hasWrites = hasWrites || lock.GetHasWrites();
2060+
protoLocks->Add(std::move(lock));
2061+
}
2062+
locksList.clear();
2063+
2064+
// When locks with writes are committed this commits accumulated effects
2065+
if (Request.LocksOp == ELocksOp::Commit && hasWrites) {
2066+
ShardsWithEffects.insert(shardId);
2067+
YQL_ENSURE(!ReadOnlyTx);
2068+
}
2069+
}
2070+
20262071
Request.TopicOperations.BuildTopicTxs(topicTxs);
20272072

20282073
const bool needRollback = Request.LocksOp == ELocksOp::Rollback;
@@ -2042,7 +2087,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20422087
// TODO: add support in the future
20432088
topicTxs.empty() &&
20442089
// We only want to use volatile transactions for multiple shards
2045-
(affectedShardsSet.size() + topicTxs.size()) > 1 &&
2090+
(datashardTxs.size() + topicTxs.size()) > 1 &&
20462091
// We cannot use volatile transactions with persistent channels
20472092
// Note: currently persistent channels are never used
20482093
!HasPersistentChannels);
@@ -2055,30 +2100,29 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20552100
// Transactions with topics must always use generic readsets
20562101
!topicTxs.empty());
20572102

2058-
if (auto locksMap = Request.DataShardLocks;
2059-
!locksMap.empty() ||
2060-
VolatileTx ||
2103+
if (!locksMap.empty() || VolatileTx ||
20612104
Request.TopicOperations.HasReadOperations())
20622105
{
20632106
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);
20642107

20652108
bool needCommit = Request.LocksOp == ELocksOp::Commit || VolatileTx;
20662109

2067-
auto locksOp = needCommit
2068-
? NKikimrDataEvents::TKqpLocks::Commit
2069-
: NKikimrDataEvents::TKqpLocks::Rollback;
2070-
20712110
absl::flat_hash_set<ui64> sendingShardsSet;
20722111
absl::flat_hash_set<ui64> receivingShardsSet;
20732112

20742113
// Gather shards that need to send/receive readsets (shards with effects)
20752114
if (needCommit) {
2076-
for (auto& shardId: affectedShardsSet) {
2115+
for (auto& [shardId, tx] : datashardTxs) {
2116+
if (tx->HasLocks()) {
2117+
// Locks may be broken so shards with locks need to send readsets
2118+
sendingShardsSet.insert(shardId);
2119+
}
20772120
if (ShardsWithEffects.contains(shardId)) {
20782121
// Volatile transactions may abort effects, so they send readsets
20792122
if (VolatileTx) {
20802123
sendingShardsSet.insert(shardId);
20812124
}
2125+
// Effects are only applied when all locks are valid
20822126
receivingShardsSet.insert(shardId);
20832127
}
20842128
}
@@ -2093,44 +2137,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20932137
}
20942138
}
20952139

2096-
// Gather locks that need to be committed or erased
2097-
for (auto& [shardId, locksList] : locksMap) {
2098-
NKikimrTxDataShard::TKqpTransaction* tx = nullptr;
2099-
auto it = datashardTxs.find(shardId);
2100-
if (it != datashardTxs.end()) {
2101-
tx = it->second;
2102-
} else {
2103-
auto [eIt, success] = datashardTxs.emplace(
2104-
shardId,
2105-
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());
2106-
tx = eIt->second;
2107-
}
2108-
2109-
tx->MutableLocks()->SetOp(locksOp);
2110-
2111-
if (!locksList.empty()) {
2112-
auto* protoLocks = tx->MutableLocks()->MutableLocks();
2113-
protoLocks->Reserve(locksList.size());
2114-
bool hasWrites = false;
2115-
for (auto& lock : locksList) {
2116-
hasWrites = hasWrites || lock.GetHasWrites();
2117-
protoLocks->Add()->Swap(&lock);
2118-
}
2119-
2120-
if (needCommit) {
2121-
// We also send the result on commit
2122-
sendingShardsSet.insert(shardId);
2123-
2124-
if (hasWrites) {
2125-
// Tx with uncommitted changes can be aborted due to conflicts,
2126-
// so shards with write locks should receive readsets
2127-
receivingShardsSet.insert(shardId);
2128-
YQL_ENSURE(!ReadOnlyTx);
2129-
}
2130-
}
2131-
}
2132-
}
2133-
2140+
// Encode sending/receiving shards in tx bodies
21342141
if (needCommit) {
21352142
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());
21362143
NProtoBuf::RepeatedField<ui64> receivingShards(receivingShardsSet.begin(), receivingShardsSet.end());
@@ -2139,23 +2146,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21392146
std::sort(receivingShards.begin(), receivingShards.end());
21402147

21412148
for (auto& [shardId, shardTx] : datashardTxs) {
2142-
shardTx->MutableLocks()->SetOp(locksOp);
2149+
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
21432150
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
21442151
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
21452152
}
21462153

21472154
for (auto& [_, tx] : topicTxs) {
2148-
switch (locksOp) {
2149-
case NKikimrDataEvents::TKqpLocks::Commit:
2150-
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
2151-
break;
2152-
case NKikimrDataEvents::TKqpLocks::Rollback:
2153-
tx.SetOp(NKikimrPQ::TDataTransaction::Rollback);
2154-
break;
2155-
case NKikimrDataEvents::TKqpLocks::Unspecified:
2156-
break;
2157-
}
2158-
2155+
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
21592156
*tx.MutableSendingShards() = sendingShards;
21602157
*tx.MutableReceivingShards() = receivingShards;
21612158
}

ydb/core/tx/datashard/datashard_ut_order.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4279,6 +4279,7 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
42794279

42804280
bool capturePlanSteps = true;
42814281
TVector<THolder<IEventHandle>> capturedPlanSteps;
4282+
TVector<ui64> capturedPlanTxIds;
42824283
THashSet<ui64> passReadSetTxIds;
42834284
ui64 observedReadSets = 0;
42844285
TVector<THolder<IEventHandle>> capturedReadSets;
@@ -4294,6 +4295,12 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
42944295
case TEvTxProcessing::TEvPlanStep::EventType: {
42954296
if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor && capturePlanSteps) {
42964297
Cerr << "... captured plan step for table-3" << Endl;
4298+
auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
4299+
for (const auto& tx : msg->Record.GetTransactions()) {
4300+
ui64 txId = tx.GetTxId();
4301+
capturedPlanTxIds.push_back(txId);
4302+
Cerr << "... captured plan step tx " << txId << " for table-3" << Endl;
4303+
}
42974304
capturedPlanSteps.emplace_back(ev.Release());
42984305
return TTestActorRuntime::EEventAction::DROP;
42994306
}
@@ -4303,6 +4310,12 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
43034310
if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor) {
43044311
auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
43054312
ui64 txId = msg->Record.GetTxId();
4313+
if ((msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) &&
4314+
(msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA))
4315+
{
4316+
Cerr << "... passing expectation for txid# " << txId << Endl;
4317+
break;
4318+
}
43064319
++observedReadSets;
43074320
if (!passReadSetTxIds.contains(txId)) {
43084321
Cerr << "... readset for txid# " << txId << " was blocked" << Endl;
@@ -4353,20 +4366,11 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
43534366
}
43544367
};
43554368

4356-
waitFor([&]{ return capturedPlanSteps.size() > 0; }, "plan step");
4357-
UNIT_ASSERT_VALUES_EQUAL(capturedPlanSteps.size(), 1u);
4358-
ui64 realTxId1, realTxId2;
4359-
{
4360-
auto* msg = capturedPlanSteps[0]->Get<TEvTxProcessing::TEvPlanStep>();
4361-
TVector<ui64> realTxIds;
4362-
for (const auto& tx : msg->Record.GetTransactions()) {
4363-
realTxIds.emplace_back(tx.GetTxId());
4364-
}
4365-
UNIT_ASSERT_VALUES_EQUAL(realTxIds.size(), 2u);
4366-
std::sort(realTxIds.begin(), realTxIds.end());
4367-
realTxId1 = realTxIds.at(0);
4368-
realTxId2 = realTxIds.at(1);
4369-
}
4369+
waitFor([&]{ return capturedPlanTxIds.size() >= 2; }, "captured transactions");
4370+
UNIT_ASSERT_C(capturedPlanTxIds.size(), 2u);
4371+
std::sort(capturedPlanTxIds.begin(), capturedPlanTxIds.end());
4372+
ui64 realTxId1 = capturedPlanTxIds.at(0);
4373+
ui64 realTxId2 = capturedPlanTxIds.at(1);
43704374

43714375
// Unblock and resend the plan step message
43724376
capturePlanSteps = false;
@@ -4375,7 +4379,7 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
43754379
}
43764380
capturedPlanSteps.clear();
43774381

4378-
// Wait until there are 2 readset messages
4382+
// Wait until there are 2 readset messages (with data)
43794383
waitFor([&]{ return capturedReadSets.size() >= 2; }, "initial readsets");
43804384
SimulateSleep(runtime, TDuration::MilliSeconds(5));
43814385

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ struct TTestHelper {
348348
auto &runtime = *Server->GetRuntime();
349349
Sender = runtime.AllocateEdgeActor();
350350

351-
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
351+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
352352
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);
353353

354354
InitRoot(Server, Sender);
@@ -818,7 +818,11 @@ struct TTestHelper {
818818
break;
819819
}
820820
case TEvTxProcessing::EvReadSet: {
821-
if (dropRS) {
821+
auto* msg = event->Get<TEvTxProcessing::TEvReadSet>();
822+
auto flags = msg->Record.GetFlags();
823+
auto isExpect = flags & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET;
824+
auto isNoData = flags & NKikimrTx::TEvReadSet::FLAG_NO_DATA;
825+
if (dropRS && !(isExpect && isNoData)) {
822826
result.ReadSets.push_back(std::move(event));
823827
return TTestActorRuntime::EEventAction::DROP;
824828
}
@@ -852,7 +856,10 @@ struct TTestHelper {
852856
)"));
853857
}
854858

855-
waitFor([&]{ return result.ReadSets.size() == 1; }, "intercepted RS");
859+
const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
860+
const size_t expectedReadSets = 1 + (finalUpserts && usesVolatileTxs ? 2 : 0);
861+
862+
waitFor([&]{ return result.ReadSets.size() == expectedReadSets; }, "intercepted RS");
856863

857864
// restore original observer (note we used lambda function and stack variables)
858865
Server->GetRuntime()->SetObserverFunc(prevObserverFunc);
@@ -2576,7 +2583,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
25762583
TPortManager pm;
25772584
TServerSettings serverSettings(pm.GetPort(2134));
25782585
serverSettings.SetDomainName("Root")
2579-
.SetUseRealThreads(false);
2586+
.SetUseRealThreads(false)
2587+
// Blocked volatile transactions block reads, disable
2588+
.SetEnableDataShardVolatileTransactions(false);
25802589

25812590
const ui64 shardCount = 1;
25822591
TTestHelper helper(serverSettings, shardCount);
@@ -3600,7 +3609,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
36003609
auto& runtime = *server->GetRuntime();
36013610
auto sender = runtime.AllocateEdgeActor();
36023611

3603-
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
3612+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
36043613
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);
36053614
// runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
36063615

ydb/core/tx/datashard/datashard_ut_rs.cpp

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ struct IsReadSet {
4848
{
4949
if (ev.GetTypeRewrite() == TEvTxProcessing::EvReadSet) {
5050
auto &rec = ev.Get<TEvTxProcessing::TEvReadSet>()->Record;
51-
if (rec.GetTabletSource() == Source && rec.GetTabletDest() == Dest) {
51+
bool isExpectation = (
52+
(rec.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) &&
53+
(rec.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA));
54+
if (rec.GetTabletSource() == Source && rec.GetTabletDest() == Dest && !isExpectation) {
5255
return true;
5356
}
5457
}
@@ -64,7 +67,9 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) {
6467
TPortManager pm;
6568
TServerSettings serverSettings(pm.GetPort(2134));
6669
serverSettings.SetDomainName("Root")
67-
.SetUseRealThreads(false);
70+
.SetUseRealThreads(false)
71+
// Volatile transactions avoid storing readsets in InReadSets table
72+
.SetEnableDataShardVolatileTransactions(false);
6873

6974
Tests::TServer::TPtr server = new TServer(serverSettings);
7075
auto &runtime = *server->GetRuntime();
@@ -250,7 +255,13 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) {
250255
TPortManager pm;
251256
TServerSettings serverSettings(pm.GetPort(2134));
252257
serverSettings.SetDomainName("Root")
253-
.SetUseRealThreads(false);
258+
.SetUseRealThreads(false)
259+
// This test expects rs acks to be delayed during one of restarts,
260+
// which doesn't happen with volatile transactions. With volatile
261+
// transactions both upserts have already executed, one of them is
262+
// just waiting for confirmation before making changes visible.
263+
// Since acks are not delayed they are just gone when dropped.
264+
.SetEnableDataShardVolatileTransactions(false);
254265

255266
Tests::TServer::TPtr server = new TServer(serverSettings);
256267
auto &runtime = *server->GetRuntime();
@@ -278,7 +289,10 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) {
278289
auto captureRS = [shard1,shard3](TAutoPtr<IEventHandle> &event) -> auto {
279290
if (event->GetTypeRewrite() == TEvTxProcessing::EvReadSet) {
280291
auto &rec = event->Get<TEvTxProcessing::TEvReadSet>()->Record;
281-
if (rec.GetTabletSource() == shard1) {
292+
bool isExpectation = (
293+
(rec.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) &&
294+
(rec.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA));
295+
if (rec.GetTabletSource() == shard1 && !isExpectation) {
282296
return TTestActorRuntime::EEventAction::DROP;
283297
}
284298
} else if (event->GetTypeRewrite() == TEvTxProcessing::EvReadSetAck) {
@@ -359,6 +373,9 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) {
359373
switch (ev->GetTypeRewrite()) {
360374
case TEvTxProcessing::TEvReadSet::EventType: {
361375
auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
376+
if (msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) {
377+
break;
378+
}
362379
NKikimrTx::TReadSetData genericData;
363380
Y_ABORT_UNLESS(genericData.ParseFromString(msg->Record.GetReadSet()));
364381
Cerr << "... generic readset: " << genericData.DebugString() << Endl;
@@ -419,6 +436,13 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) {
419436
switch (ev->GetTypeRewrite()) {
420437
case TEvTxProcessing::TEvReadSet::EventType: {
421438
auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
439+
if (msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) {
440+
if (!(msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET)) {
441+
Cerr << "... nodata readset" << Endl;
442+
++readSets;
443+
}
444+
break;
445+
}
422446
NKikimrTx::TReadSetData genericData;
423447
Y_ABORT_UNLESS(genericData.ParseFromString(msg->Record.GetReadSet()));
424448
Cerr << "... generic readset: " << genericData.DebugString() << Endl;

0 commit comments

Comments
 (0)