Skip to content

Commit 14c6dfb

Browse files
authored
Fix partial distributed commit of uncommitted changes during shard restart race (#2169)
1 parent fd0b1ac commit 14c6dfb

8 files changed

+249
-29
lines changed

ydb/core/tx/datashard/datashard_dep_tracker.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,11 +673,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
673673

674674
if (lock) {
675675
lock->SetLastOpId(op->GetTxId());
676-
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) {
676+
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent() && !lock->IsFrozen()) {
677677
// This lock was cached before, and since we know
678678
// it's persistent, we know it was also frozen
679679
// during that lock caching. Restore the frozen
680680
// flag for this lock.
681+
// Note: this code path is only for older shards
682+
// which didn't persist the frozen flag.
681683
lock->SetFrozen();
682684
}
683685
}

ydb/core/tx/datashard/datashard_locks.cpp

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,17 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
4444
, CreationTime(TAppData::TimeProvider->Now())
4545
{}
4646

47-
TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs)
47+
TLockInfo::TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row)
4848
: Locker(locker)
49-
, LockId(lockId)
50-
, LockNodeId(lockNodeId)
51-
, Generation(generation)
52-
, Counter(counter)
53-
, CreationTime(createTs)
49+
, LockId(row.LockId)
50+
, LockNodeId(row.LockNodeId)
51+
, Generation(row.Generation)
52+
, Counter(row.Counter)
53+
, CreationTime(TInstant::MicroSeconds(row.CreateTs))
54+
, Flags(ELockFlags(row.Flags))
5455
, Persistent(true)
5556
{
56-
if (counter == Max<ui64>()) {
57+
if (Counter == Max<ui64>()) {
5758
BreakVersion.emplace(TRowVersion::Min());
5859
}
5960
}
@@ -145,7 +146,7 @@ void TLockInfo::OnRemoved() {
145146
void TLockInfo::PersistLock(ILocksDb* db) {
146147
Y_ABORT_UNLESS(!IsPersistent());
147148
Y_ABORT_UNLESS(db, "Cannot persist lock without a db");
148-
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds());
149+
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds(), ui64(Flags));
149150
Persistent = true;
150151

151152
PersistRanges(db);
@@ -298,11 +299,11 @@ void TLockInfo::CleanupConflicts() {
298299
}
299300
}
300301

301-
void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) {
302+
void TLockInfo::RestorePersistentRange(const ILocksDb::TLockRange& rangeRow) {
302303
auto& range = PersistentRanges.emplace_back();
303-
range.Id = rangeId;
304-
range.TableId = tableId;
305-
range.Flags = flags;
304+
range.Id = rangeRow.RangeId;
305+
range.TableId = rangeRow.TableId;
306+
range.Flags = ELockRangeFlags(rangeRow.Flags);
306307

307308
if (!!(range.Flags & ELockRangeFlags::Read)) {
308309
if (ReadTables.insert(range.TableId).second) {
@@ -334,6 +335,14 @@ void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) {
334335
VolatileDependencies.insert(txId);
335336
}
336337

338+
void TLockInfo::SetFrozen(ILocksDb* db) {
339+
Y_ABORT_UNLESS(IsPersistent());
340+
Flags |= ELockFlags::Frozen;
341+
if (db) {
342+
db->PersistLockFlags(LockId, ui64(Flags));
343+
}
344+
}
345+
337346
// TTableLocks
338347

339348
void TTableLocks::AddShardLock(TLockInfo* lock) {
@@ -550,14 +559,14 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
550559
return lock;
551560
}
552561

553-
TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) {
554-
Y_ABORT_UNLESS(Locks.find(lockId) == Locks.end());
562+
TLockInfo::TPtr TLockLocker::AddLock(const ILocksDb::TLockRow& row) {
563+
Y_ABORT_UNLESS(Locks.find(row.LockId) == Locks.end());
555564

556-
TLockInfo::TPtr lock(new TLockInfo(this, lockId, lockNodeId, generation, counter, createTs));
565+
TLockInfo::TPtr lock(new TLockInfo(this, row));
557566
Y_ABORT_UNLESS(lock->IsPersistent());
558-
Locks[lockId] = lock;
559-
if (lockNodeId) {
560-
PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
567+
Locks[row.LockId] = lock;
568+
if (row.LockNodeId) {
569+
PendingSubscribeLocks.emplace_back(row.LockId, row.LockNodeId);
561570
}
562571
return lock;
563572
}
@@ -1171,9 +1180,9 @@ bool TSysLocks::Load(ILocksDb& db) {
11711180
Locker.Clear();
11721181

11731182
for (auto& lockRow : rows) {
1174-
TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs));
1183+
TLockInfo::TPtr lock = Locker.AddLock(lockRow);
11751184
for (auto& rangeRow : lockRow.Ranges) {
1176-
lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags));
1185+
lock->RestorePersistentRange(rangeRow);
11771186
}
11781187
}
11791188

ydb/core/tx/datashard/datashard_locks.h

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class ILocksDb {
5757
// Persist adding/removing a lock info
5858
virtual void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) = 0;
5959
virtual void PersistLockCounter(ui64 lockId, ui64 counter) = 0;
60+
virtual void PersistLockFlags(ui64 lockId, ui64 flags) = 0;
6061
virtual void PersistRemoveLock(ui64 lockId) = 0;
6162

6263
// Persist adding/removing info on locked ranges
@@ -206,6 +207,23 @@ struct TPendingSubscribeLock {
206207
}
207208
};
208209

210+
// ELockFlags type safe enum
211+
212+
enum class ELockFlags : ui64 {
213+
None = 0,
214+
Frozen = 1,
215+
};
216+
217+
using ELockFlagsRaw = std::underlying_type<ELockFlags>::type;
218+
219+
inline ELockFlags operator|(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) | ELockFlagsRaw(b)); }
220+
inline ELockFlags operator&(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) & ELockFlagsRaw(b)); }
221+
inline ELockFlags& operator|=(ELockFlags& a, ELockFlags b) { return a = a | b; }
222+
inline ELockFlags& operator&=(ELockFlags& a, ELockFlags b) { return a = a & b; }
223+
inline bool operator!(ELockFlags c) { return ELockFlagsRaw(c) == 0; }
224+
225+
// ELockConflictFlags type safe enum
226+
209227
enum class ELockConflictFlags : ui8 {
210228
None = 0,
211229
BreakThemOnOurCommit = 1,
@@ -220,6 +238,8 @@ inline ELockConflictFlags& operator|=(ELockConflictFlags& a, ELockConflictFlags
220238
inline ELockConflictFlags& operator&=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a & b; }
221239
inline bool operator!(ELockConflictFlags c) { return ELockConflictFlagsRaw(c) == 0; }
222240

241+
// ELockRangeFlags type safe enum
242+
223243
enum class ELockRangeFlags : ui8 {
224244
None = 0,
225245
Read = 1,
@@ -262,7 +282,7 @@ class TLockInfo
262282
using TPtr = TIntrusivePtr<TLockInfo>;
263283

264284
TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId);
265-
TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
285+
TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row);
266286
~TLockInfo();
267287

268288
bool Empty() const {
@@ -303,6 +323,9 @@ class TLockInfo
303323
ui32 GetLockNodeId() const { return LockNodeId; }
304324

305325
TInstant GetCreationTime() const { return CreationTime; }
326+
327+
ELockFlags GetFlags() const { return Flags; }
328+
306329
const THashSet<TPathId>& GetReadTables() const { return ReadTables; }
307330
const THashSet<TPathId>& GetWriteTables() const { return WriteTables; }
308331

@@ -320,7 +343,7 @@ class TLockInfo
320343
void PersistConflicts(ILocksDb* db);
321344
void CleanupConflicts();
322345

323-
void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags);
346+
void RestorePersistentRange(const ILocksDb::TLockRange& rangeRow);
324347
void RestorePersistentConflict(TLockInfo* otherLock);
325348
void RestorePersistentVolatileDependency(ui64 txId);
326349

@@ -341,8 +364,8 @@ class TLockInfo
341364
ui64 GetLastOpId() const { return LastOpId; }
342365
void SetLastOpId(ui64 opId) { LastOpId = opId; }
343366

344-
bool IsFrozen() const { return Frozen; }
345-
void SetFrozen() { Frozen = true; }
367+
bool IsFrozen() const { return !!(Flags & ELockFlags::Frozen); }
368+
void SetFrozen(ILocksDb* db = nullptr);
346369

347370
private:
348371
void MakeShardLock();
@@ -369,6 +392,7 @@ class TLockInfo
369392
ui32 Generation;
370393
ui64 Counter;
371394
TInstant CreationTime;
395+
ELockFlags Flags = ELockFlags::None;
372396
THashSet<TPathId> ReadTables;
373397
THashSet<TPathId> WriteTables;
374398
TVector<TPointKey> Points;
@@ -386,7 +410,6 @@ class TLockInfo
386410
TVector<TPersistentRange> PersistentRanges;
387411

388412
ui64 LastOpId = 0;
389-
bool Frozen = false;
390413
};
391414

392415
struct TTableLocksReadListTag {};
@@ -641,7 +664,7 @@ class TLockLocker {
641664
void RemoveBrokenRanges();
642665

643666
TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId);
644-
TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
667+
TLockInfo::TPtr AddLock(const ILocksDb::TLockRow& row);
645668
void RemoveOneLock(ui64 lockId, ILocksDb* db = nullptr);
646669

647670
void SaveBrokenPersistentLocks(ILocksDb* db);

ydb/core/tx/datashard/datashard_locks_db.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) {
129129
HasChanges_ = true;
130130
}
131131

132+
void TDataShardLocksDb::PersistLockFlags(ui64 lockId, ui64 flags) {
133+
using Schema = TDataShard::Schema;
134+
NIceDb::TNiceDb db(DB);
135+
db.Table<Schema::Locks>().Key(lockId).Update(
136+
NIceDb::TUpdate<Schema::Locks::Flags>(flags));
137+
HasChanges_ = true;
138+
}
139+
132140
void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
133141
// We remove lock changes unless it's managed by volatile tx manager
134142
bool isVolatile = Self.GetVolatileTxManager().FindByCommitTxId(lockId);

ydb/core/tx/datashard/datashard_locks_db.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class TDataShardLocksDb
2323
// Persist adding/removing a lock info
2424
void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) override;
2525
void PersistLockCounter(ui64 lockId, ui64 counter) override;
26+
void PersistLockFlags(ui64 lockId, ui64 flags) override;
2627
void PersistRemoveLock(ui64 lockId) override;
2728

2829
// Persist adding/removing info on locked ranges

ydb/core/tx/datashard/datashard_ut_common_kqp.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,17 @@ namespace NKqpHelpers {
146146
if (result.result_sets_size() == 0) {
147147
return "<empty>";
148148
}
149-
return FormatResult(result.result_sets(0));
149+
if (result.result_sets_size() == 1) {
150+
return FormatResult(result.result_sets(0));
151+
}
152+
TStringBuilder sb;
153+
for (int i = 0; i < result.result_sets_size(); ++i) {
154+
if (i != 0) {
155+
sb << "\n";
156+
}
157+
sb << FormatResult(result.result_sets(i));
158+
}
159+
return sb;
150160
}
151161

152162
inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) {

0 commit comments

Comments
 (0)