Skip to content

Commit 2e9f972

Browse files
committed
Remove schema snapshots, add change records to queue at init
1 parent 2e5f6ef commit 2e9f972

File tree

8 files changed

+161
-28
lines changed

8 files changed

+161
-28
lines changed

ydb/core/protos/counters_datashard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,5 @@ enum ETxTypes {
490490
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
491491
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
492492
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
493+
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
493494
}

ydb/core/tx/datashard/datashard.cpp

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -869,8 +869,10 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
869869
Y_VERIFY_S(it != ChangesQueue.end(), "Cannot find change record: " << order);
870870

871871
if (it->second.SchemaSnapshotAcquired) {
872-
SchemaSnapshotManager.ReleaseReference(
873-
TSchemaSnapshotKey(it->second.TableId, it->second.SchemaVersion));
872+
const auto snapshotKey = TSchemaSnapshotKey(it->second.TableId, it->second.SchemaVersion);
873+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
874+
ScheduleRemoveSchemaSnapshot(snapshotKey);
875+
}
874876
}
875877

876878
ChangesQueue.erase(it);
@@ -998,8 +1000,10 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
9981000
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);
9991001

10001002
if (cIt->second.SchemaSnapshotAcquired) {
1001-
SchemaSnapshotManager.ReleaseReference(
1002-
TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion));
1003+
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
1004+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
1005+
ScheduleRemoveSchemaSnapshot(snapshotKey);
1006+
}
10031007
}
10041008

10051009
ChangesQueue.erase(cIt);
@@ -1067,23 +1071,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10671071
ChangesQueueBytes -= record.BodySize;
10681072

10691073
if (record.SchemaSnapshotAcquired) {
1070-
Y_ABORT_UNLESS(record.TableId);
1071-
auto tableIt = TableInfos.find(record.TableId.LocalPathId);
1072-
1073-
if (tableIt != TableInfos.end()) {
1074-
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
1075-
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);
1076-
1077-
if (last) {
1078-
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
1079-
Y_ABORT_UNLESS(snapshot);
1080-
1081-
if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
1082-
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
1083-
}
1084-
}
1085-
} else {
1086-
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1074+
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
1075+
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
1076+
ScheduleRemoveSchemaSnapshot(snapshotKey);
10871077
}
10881078
}
10891079

@@ -1303,6 +1293,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
13031293
.SchemaVersion = schemaVersion,
13041294
});
13051295

1296+
auto res = ChangesQueue.emplace(records.back().Order, records.back());
1297+
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);
1298+
1299+
if (res.first->second.SchemaVersion) {
1300+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1301+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
1302+
}
1303+
13061304
if (!rowset.Next()) {
13071305
return false;
13081306
}
@@ -1401,6 +1399,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
14011399
});
14021400
entry.Count++;
14031401
needSort = true;
1402+
1403+
auto res = ChangesQueue.emplace(records.back().Order, records.back());
1404+
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);
1405+
1406+
if (res.first->second.SchemaVersion) {
1407+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1408+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
1409+
}
14041410
}
14051411

14061412
LockChangeRecords.erase(lockId);
@@ -1459,6 +1465,46 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
14591465
}
14601466
}
14611467

1468+
void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
1469+
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));
1470+
1471+
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
1472+
Y_ABORT_UNLESS(snapshot);
1473+
1474+
auto it = TableInfos.find(key.PathId);
1475+
if (it == TableInfos.end()) {
1476+
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1477+
return;
1478+
}
1479+
1480+
if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
1481+
bool wasEmpty = PendingSchemaSnapshotsToRemove.empty();
1482+
PendingSchemaSnapshotsToRemove.push_back(key);
1483+
if (wasEmpty) {
1484+
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
1485+
}
1486+
}
1487+
}
1488+
1489+
void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
1490+
bool wasEmpty = PendingSchemaSnapshotsToRemove.empty();
1491+
1492+
for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
1493+
auto it = TableInfos.find(key.PathId);
1494+
if (it == TableInfos.end()) {
1495+
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1496+
break;
1497+
}
1498+
if (snapshot.Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
1499+
PendingSchemaSnapshotsToRemove.push_back(key);
1500+
}
1501+
}
1502+
1503+
if (wasEmpty && !PendingSchemaSnapshotsToRemove.empty()) {
1504+
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
1505+
}
1506+
}
1507+
14621508
void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
14631509
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
14641510
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),

ydb/core/tx/datashard/datashard__init.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
425425
return false;
426426
}
427427

428+
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
429+
if (!Self->SchemaSnapshotManager.Load(db)) {
430+
return false;
431+
}
432+
}
433+
428434
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
429435
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
430436
return false;
@@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
512518
}
513519
}
514520

515-
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
516-
if (!Self->SchemaSnapshotManager.Load(db)) {
517-
return false;
518-
}
519-
}
520-
521521
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
522522
TDataShardLocksDb locksDb(*Self, txc);
523523
if (!Self->SysLocks.Load(locksDb)) {
@@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
547547
Self->SubscribeNewLocks();
548548

549549
Self->ScheduleRemoveAbandonedLockChanges();
550+
Self->ScheduleRemoveAbandonedSchemaSnapshots();
550551

551552
return true;
552553
}

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class TDataShard
241241
class TTxCdcStreamScanProgress;
242242
class TTxCdcStreamEmitHeartbeats;
243243
class TTxUpdateFollowerReadEdge;
244+
class TTxRemoveSchemaSnapshots;
244245

245246
template <typename T> friend class TTxDirectBase;
246247
class TTxUploadRows;
@@ -374,6 +375,7 @@ class TDataShard
374375
EvPlanPredictedTxs,
375376
EvStatisticsScanFinished,
376377
EvTableStatsError,
378+
EvRemoveSchemaSnapshots,
377379
EvEnd
378380
};
379381

@@ -595,6 +597,8 @@ class TDataShard
595597
struct TEvPlanPredictedTxs : public TEventLocal<TEvPlanPredictedTxs, EvPlanPredictedTxs> {};
596598

597599
struct TEvStatisticsScanFinished : public TEventLocal<TEvStatisticsScanFinished, EvStatisticsScanFinished> {};
600+
601+
struct TEvRemoveSchemaSnapshots : public TEventLocal<TEvRemoveSchemaSnapshots, EvRemoveSchemaSnapshots> {};
598602
};
599603

600604
struct Schema : NIceDb::Schema {
@@ -1383,6 +1387,8 @@ class TDataShard
13831387

13841388
void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx);
13851389

1390+
void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx);
1391+
13861392
void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
13871393

13881394
void DoPeriodicTasks(const TActorContext &ctx);
@@ -1920,6 +1926,8 @@ class TDataShard
19201926
bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShardChangeCollector::TChange>& records);
19211927
void ScheduleRemoveLockChanges(ui64 lockId);
19221928
void ScheduleRemoveAbandonedLockChanges();
1929+
void ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key);
1930+
void ScheduleRemoveAbandonedSchemaSnapshots();
19231931

19241932
static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value,
19251933
const TPathId& tablePathId, const TPathId& streamPathId);
@@ -2871,6 +2879,7 @@ class TDataShard
28712879
THashMap<ui64, TUncommittedLockChangeRecords> LockChangeRecords; // ui64 is lock id
28722880
THashMap<ui64, TCommittedLockChangeRecords> CommittedLockChangeRecords; // ui64 is lock id
28732881
TVector<ui64> PendingLockChangeRecordsToRemove;
2882+
TVector<TSchemaSnapshotKey> PendingSchemaSnapshotsToRemove;
28742883

28752884
// in
28762885
THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id
@@ -2991,6 +3000,7 @@ class TDataShard
29913000
HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
29923001
HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
29933002
HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
3003+
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
29943004
default:
29953005
if (!HandleDefaultEvents(ev, SelfId())) {
29963006
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInactive unhandled event type: " << ev->GetTypeRewrite()
@@ -3119,6 +3129,7 @@ class TDataShard
31193129
HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle);
31203130
HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle);
31213131
HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle);
3132+
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
31223133
default:
31233134
if (!HandleDefaultEvents(ev, SelfId())) {
31243135
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString());

ydb/core/tx/datashard/datashard_schema_snapshots.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db,
119119
}
120120
}
121121

122+
const TSchemaSnapshotManager::TSnapshots& TSchemaSnapshotManager::GetSnapshots() const {
123+
return Snapshots;
124+
}
125+
122126
bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) {
123127
auto it = Snapshots.find(key);
124128
if (it == Snapshots.end()) {
@@ -152,6 +156,15 @@ bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key) {
152156
return true;
153157
}
154158

159+
bool TSchemaSnapshotManager::HasReference(const TSchemaSnapshotKey& key) const {
160+
auto refIt = References.find(key);
161+
if (refIt != References.end()) {
162+
return refIt->second;
163+
} else {
164+
return false;
165+
}
166+
}
167+
155168
void TSchemaSnapshotManager::PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot) {
156169
using Schema = TDataShard::Schema;
157170
db.Table<Schema::SchemaSnapshots>()

ydb/core/tx/datashard/datashard_schema_snapshots.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct TSchemaSnapshot {
2323
};
2424

2525
class TSchemaSnapshotManager {
26+
using TSnapshots = TMap<TSchemaSnapshotKey, TSchemaSnapshot, TLess<void>>;
27+
2628
public:
2729
explicit TSchemaSnapshotManager(const TDataShard* self);
2830

@@ -33,17 +35,19 @@ class TSchemaSnapshotManager {
3335
const TSchemaSnapshot* FindSnapshot(const TSchemaSnapshotKey& key) const;
3436
void RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key);
3537
void RenameSnapshots(NTable::TDatabase& db, const TPathId& prevTableId, const TPathId& newTableId);
38+
const TSnapshots& GetSnapshots() const;
3639

3740
bool AcquireReference(const TSchemaSnapshotKey& key);
3841
bool ReleaseReference(const TSchemaSnapshotKey& key);
42+
bool HasReference(const TSchemaSnapshotKey& key) const;
3943

4044
private:
4145
void PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot);
4246
void PersistRemoveSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key);
4347

4448
private:
4549
const TDataShard* Self;
46-
TMap<TSchemaSnapshotKey, TSchemaSnapshot, TLess<void>> Snapshots;
50+
TSnapshots Snapshots;
4751
THashMap<TSchemaSnapshotKey, size_t> References;
4852

4953
}; // TSchemaSnapshotManager
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#include "datashard_impl.h"
2+
3+
namespace NKikimr::NDataShard {
4+
5+
class TDataShard::TTxRemoveSchemaSnapshots: public NTabletFlatExecutor::TTransactionBase<TDataShard> {
6+
public:
7+
TTxRemoveSchemaSnapshots(TDataShard* self)
8+
: TBase(self)
9+
{ }
10+
11+
TTxType GetTxType() const override { return TXTYPE_REMOVE_SCHEMA_SNAPSHOTS; }
12+
13+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
14+
NIceDb::TNiceDb db(txc.DB);
15+
16+
while (!Self->PendingSchemaSnapshotsToRemove.empty()) {
17+
const auto key = Self->PendingSchemaSnapshotsToRemove.back();
18+
const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(key);
19+
20+
if (!snapshot) {
21+
Self->PendingSchemaSnapshotsToRemove.pop_back();
22+
continue;
23+
}
24+
25+
if (Self->GetSchemaSnapshotManager().HasReference(key)) {
26+
Self->PendingSchemaSnapshotsToRemove.pop_back();
27+
continue;
28+
}
29+
30+
auto table = Self->FindUserTable(TPathId(key.OwnerId, key.PathId));
31+
if (!table) {
32+
Self->PendingSchemaSnapshotsToRemove.pop_back();
33+
continue;
34+
}
35+
36+
if (snapshot->Schema->GetTableSchemaVersion() >= table->GetTableSchemaVersion()) {
37+
Self->PendingSchemaSnapshotsToRemove.pop_back();
38+
continue;
39+
}
40+
41+
Self->GetSchemaSnapshotManager().RemoveShapshot(db, key);
42+
Self->PendingSchemaSnapshotsToRemove.pop_back();
43+
}
44+
45+
return true;
46+
}
47+
48+
void Complete(const TActorContext&) override {
49+
}
50+
};
51+
52+
void TDataShard::Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr&, const TActorContext& ctx) {
53+
Execute(new TTxRemoveSchemaSnapshots(this), ctx);
54+
}
55+
56+
} // namespace NKikimr::NDataShard

ydb/core/tx/datashard/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ SRCS(
189189
receive_snapshot_unit.cpp
190190
remove_lock_change_records.cpp
191191
remove_locks.cpp
192+
remove_schema_snapshots.cpp
192193
range_ops.cpp
193194
read_iterator.h
194195
restore_unit.cpp

0 commit comments

Comments
 (0)