Skip to content

Commit 6d10a7a

Browse files
authored
24-3-13-hotfix: Fix uncommitted changes leak and clean them up on startup (#13487)
1 parent 7ef8e47 commit 6d10a7a

File tree

7 files changed

+180
-18
lines changed

7 files changed

+180
-18
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ void TDataShard::SwitchToWork(const TActorContext &ctx) {
409409
NotifySchemeshard(ctx);
410410
CheckInitiateBorrowedPartsReturn(ctx);
411411
CheckStateChange(ctx);
412+
CleanupUncommitted(ctx);
412413
}
413414

414415
void TDataShard::SyncConfig() {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#include "datashard_impl.h"
2+
3+
namespace NKikimr::NDataShard {
4+
5+
using namespace NTabletFlatExecutor;
6+
7+
class TDataShard::TTxCleanupUncommitted : public TTransactionBase<TDataShard> {
8+
public:
9+
TTxCleanupUncommitted(TDataShard* self)
10+
: TTransactionBase(self)
11+
{}
12+
13+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
14+
if (Self->State != TShardState::Ready) {
15+
// We need to be very careful about cleaning up uncommitted changes
16+
// Avoid mistakes by waiting until shard restarts in a Ready state
17+
return true;
18+
}
19+
20+
size_t removed = 0;
21+
for (const auto& pr : Self->TableInfos) {
22+
if (pr.second->IsReplicated()) {
23+
// Replicated tables use uncommitted changes for replication
24+
// Since we don't track them we cannot know whether they leaked or not
25+
continue;
26+
}
27+
28+
auto localTid = pr.second->LocalTid;
29+
if (!txc.DB.GetScheme().GetTableInfo(localTid)) {
30+
// Note: this check is likely not needed, since all user tables
31+
// must be present in the Ready state, but make sure we don't
32+
// trip since this code always runs at startup.
33+
continue;
34+
}
35+
36+
auto openTxs = txc.DB.GetOpenTxs(localTid);
37+
for (ui64 txId : openTxs) {
38+
if (Self->SysLocksTable().GetLocks().contains(txId)) {
39+
// Changes are associated with a known lock
40+
continue;
41+
}
42+
if (Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
43+
// Changes are associated with a known volatile tx
44+
continue;
45+
}
46+
47+
// Changes are neither committed nor removed and are not tracked
48+
if (removed >= 1000) {
49+
// Avoid removing more than 1k transactions per transaction
50+
Reschedule = true;
51+
break;
52+
}
53+
54+
// Remove otherwise untracked changes
55+
txc.DB.RemoveTx(localTid, txId);
56+
++removed;
57+
}
58+
59+
if (Reschedule) {
60+
break;
61+
}
62+
}
63+
64+
if (removed > 0) {
65+
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
66+
"DataShard " << Self->TabletID() << " removed " << removed << " untracked uncommitted changes");
67+
}
68+
69+
return true;
70+
}
71+
72+
void Complete(const TActorContext& ctx) override {
73+
if (Reschedule) {
74+
Self->CleanupUncommitted(ctx);
75+
}
76+
}
77+
78+
private:
79+
bool Reschedule = false;
80+
};
81+
82+
void TDataShard::CleanupUncommitted(const TActorContext& ctx) {
83+
if (State == TShardState::Ready) {
84+
Execute(new TTxCleanupUncommitted(this), ctx);
85+
}
86+
}
87+
88+
} // namespace NKikimr::NDataShard

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ class TDataShard
242242
class TTxCdcStreamEmitHeartbeats;
243243
class TTxUpdateFollowerReadEdge;
244244
class TTxRemoveSchemaSnapshots;
245+
class TTxCleanupUncommitted;
245246

246247
template <typename T> friend class TTxDirectBase;
247248
class TTxUploadRows;
@@ -1422,6 +1423,9 @@ class TDataShard
14221423
void SwitchToWork(const TActorContext &ctx);
14231424
void SyncConfig();
14241425

1426+
// Cleanup for bug https://github.com/ydb-platform/ydb/issues/13387
1427+
void CleanupUncommitted(const TActorContext &ctx);
1428+
14251429
TMaybe<TInstant> GetTxPlanStartTimeAndCleanup(ui64 step);
14261430

14271431
struct TPersistentTablet;

ydb/core/tx/datashard/datashard_ut_snapshot.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5149,6 +5149,60 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
51495149
}
51505150
}
51515151

5152+
Y_UNIT_TEST(BrokenLockChangesDontLeak) {
5153+
TPortManager pm;
5154+
TServerSettings serverSettings(pm.GetPort(2134));
5155+
serverSettings.SetDomainName("Root")
5156+
.SetUseRealThreads(false)
5157+
.SetDomainPlanResolution(100);
5158+
5159+
Tests::TServer::TPtr server = new TServer(serverSettings);
5160+
auto &runtime = *server->GetRuntime();
5161+
auto sender = runtime.AllocateEdgeActor();
5162+
5163+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
5164+
5165+
InitRoot(server, sender);
5166+
5167+
TDisableDataShardLogBatching disableDataShardLogBatching;
5168+
5169+
UNIT_ASSERT_VALUES_EQUAL(
5170+
KqpSchemeExec(runtime, R"(
5171+
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key));
5172+
)"),
5173+
"SUCCESS");
5174+
5175+
ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11);");
5176+
5177+
TString sessionId, txId;
5178+
UNIT_ASSERT_VALUES_EQUAL(
5179+
KqpSimpleBegin(runtime, sessionId, txId, R"(
5180+
SELECT key, value FROM `/Root/table`
5181+
ORDER BY key;
5182+
)"),
5183+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");
5184+
5185+
ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (2, 22);");
5186+
5187+
UNIT_ASSERT_VALUES_EQUAL(
5188+
KqpSimpleContinue(runtime, sessionId, txId, R"(
5189+
UPSERT INTO `/Root/table` (key, value) VALUES (3, 33);
5190+
SELECT key, value FROM `/Root/table` ORDER BY key;
5191+
)"),
5192+
"ERROR: ABORTED");
5193+
5194+
const auto shards = GetTableShards(server, sender, "/Root/table");
5195+
const auto tableId = ResolveTableId(server, sender, "/Root/table");
5196+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
5197+
5198+
// Check shard doesn't have open transactions
5199+
{
5200+
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId));
5201+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
5202+
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
5203+
}
5204+
}
5205+
51525206
}
51535207

51545208
} // namespace NKikimr

ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,34 +146,42 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
146146
}
147147

148148
if (guardLocks.LockTxId) {
149+
auto abortLock = [&]() {
150+
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
151+
<< " aborting because it cannot acquire locks");
152+
153+
op->SetAbortedFlag();
154+
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
155+
return EExecutionStatus::Executed;
156+
};
157+
149158
switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
150159
case EEnsureCurrentLock::Success:
151160
// Lock is valid, we may continue with reads and side-effects
152161
break;
153162

154163
case EEnsureCurrentLock::Broken:
155164
// Lock is valid, but broken, we could abort early in some
156-
// cases, but it doesn't affect correctness.
165+
// cases, but it doesn't affect correctness. For write
166+
// transactions we need to abort, since we may otherwise
167+
// perform writes that are not attached to any lock.
168+
if (!op->IsReadOnly()) {
169+
return abortLock();
170+
}
157171
break;
158172

159173
case EEnsureCurrentLock::TooMany:
160174
// Lock cannot be created, it's not necessarily a problem
161175
// for read-only transactions, for non-readonly we need to
162176
// abort;
163-
if (op->IsReadOnly()) {
164-
break;
177+
if (!op->IsReadOnly()) {
178+
return abortLock();
165179
}
166-
167-
[[fallthrough]];
180+
break;
168181

169182
case EEnsureCurrentLock::Abort:
170183
// Lock cannot be created and we must abort
171-
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
172-
<< " aborting because it cannot acquire locks");
173-
174-
op->SetAbortedFlag();
175-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
176-
return EExecutionStatus::Executed;
184+
return abortLock();
177185
}
178186
}
179187

ydb/core/tx/datashard/execute_write_unit.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
286286
}
287287

288288
if (guardLocks.LockTxId) {
289+
auto abortLock = [&]() {
290+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks");
291+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks");
292+
return EExecutionStatus::Executed;
293+
};
294+
289295
switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
290296
case EEnsureCurrentLock::Success:
291297
// Lock is valid, we may continue with reads and side-effects
@@ -294,23 +300,23 @@ class TExecuteWriteUnit : public TExecutionUnit {
294300
case EEnsureCurrentLock::Broken:
295301
// Lock is valid, but broken, we could abort early in some
296302
// cases, but it doesn't affect correctness.
303+
if (!op->IsReadOnly()) {
304+
return abortLock();
305+
}
297306
break;
298307

299308
case EEnsureCurrentLock::TooMany:
300309
// Lock cannot be created, it's not necessarily a problem
301310
// for read-only transactions, for non-readonly we need to
302311
// abort;
303-
if (op->IsReadOnly()) {
304-
break;
312+
if (!op->IsReadOnly()) {
313+
return abortLock();
305314
}
306-
307-
[[fallthrough]];
315+
break;
308316

309317
case EEnsureCurrentLock::Abort:
310318
// Lock cannot be created and we must abort
311-
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks");
312-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks");
313-
return EExecutionStatus::Executed;
319+
return abortLock();
314320
}
315321
}
316322

ydb/core/tx/datashard/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ SRCS(
4949
datashard__cleanup_borrowed.cpp
5050
datashard__cleanup_in_rs.cpp
5151
datashard__cleanup_tx.cpp
52+
datashard__cleanup_uncommitted.cpp
5253
datashard__conditional_erase_rows.cpp
5354
datashard__engine_host.cpp
5455
datashard__engine_host.h

0 commit comments

Comments
 (0)