22
33namespace NKikimr ::NOlap {
44
5- class TSchemaVersionNormalizer ::TNormalizerResult : public INormalizerChanges {
5+ class TSchemaVersionNormalizer ::TNormalizerResult: public INormalizerChanges {
66private:
77 class TKey {
8+ private:
9+ std::optional<ui64> Version;
10+
811 public:
912 ui64 Step;
1013 ui64 TxId;
11- ui64 Version;
1214 ui32 Id;
1315
1416 public:
1517 TKey () = default ;
1618
17- TKey (ui32 id, ui64 step, ui64 txId, ui64 version)
18- : Step(step)
19+ ui64 GetVersion () const {
20+ AFL_VERIFY (Version);
21+ return *Version;
22+ }
23+
24+ TKey (ui32 id, ui64 step, ui64 txId, const std::optional<ui64> version)
25+ : Version(version)
26+ , Step(step)
1927 , TxId(txId)
20- , Version(version)
21- , Id(id)
22- {
28+ , Id(id) {
29+ }
30+
31+ bool operator <(const TKey& item) const {
32+ if (Id == item.Id ) {
33+ const bool result = std::tie (Step, TxId) < std::tie (item.Step , item.TxId );
34+ if (Version && item.Version ) {
35+ const bool resultVersions = Version < item.Version ;
36+ AFL_VERIFY (result == resultVersions);
37+ }
38+ return result;
39+ } else {
40+ return Id < item.Id ;
41+ }
42+ }
43+
44+ bool operator ==(const TKey& item) const {
45+ return std::tie (Id, Step, TxId, Version) == std::tie (item.Id , item.Step , item.TxId , item.Version );
2346 }
2447 };
2548
@@ -28,15 +51,12 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
2851 ui64 PathId;
2952 ui64 Step;
3053 ui64 TxId;
31- ui64 Version;
3254
3355 public:
34- TTableKey (ui64 pathId, ui64 step, ui64 txId, ui64 version )
56+ TTableKey (ui64 pathId, ui64 step, ui64 txId)
3557 : PathId(pathId)
3658 , Step(step)
37- , TxId(txId)
38- , Version(version)
39- {
59+ , TxId(txId) {
4060 }
4161 };
4262
@@ -46,19 +66,19 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
4666public:
4767 TNormalizerResult (std::vector<TKey>&& versions, std::vector<TTableKey>&& tableVersions)
4868 : VersionsToRemove(versions)
49- , TableVersionsToRemove(tableVersions)
50- {
69+ , TableVersionsToRemove(tableVersions) {
5170 }
5271
5372 bool ApplyOnExecute (NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */ ) const override {
5473 using namespace NColumnShard ;
5574 NIceDb::TNiceDb db (txc.DB );
56- for (auto & key: VersionsToRemove) {
57- LOG_S_DEBUG ( " Removing schema version in TSchemaVersionNormalizer " << key.Version );
75+ for (auto & key : VersionsToRemove) {
76+ AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)( " event " , " Removing schema version in TSchemaVersionNormalizer" )( " version " , key.GetVersion () );
5877 db.Table <Schema::SchemaPresetVersionInfo>().Key (key.Id , key.Step , key.TxId ).Delete ();
5978 }
60- for (auto & key: TableVersionsToRemove) {
61- LOG_S_DEBUG (" Removing table version in TSchemaVersionNormalizer " << key.Version << " pathId " << key.PathId );
79+ for (auto & key : TableVersionsToRemove) {
80+ AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " Removing table version in TSchemaVersionNormalizer" )(" pathId" , key.PathId )(
81+ " plan_step" , key.Step )(" tx_id" , key.TxId );
6282 db.Table <Schema::TableVersionInfo>().Key (key.PathId , key.Step , key.TxId ).Delete ();
6383 }
6484 return true ;
@@ -101,97 +121,100 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
101121 }
102122 }
103123
104- std::vector <TKey> unusedSchemaIds ;
124+ std::map <TKey, bool > schemaIdUsability ;
105125 std::vector<TTableKey> unusedTableSchemaIds;
106- std::optional<ui64> maxVersion;
107126 std::vector<INormalizerChanges::TPtr> changes;
108127
109128 {
129+ THashMap<ui32, TKey> maxByPresetId;
110130 auto rowset = db.Table <Schema::SchemaPresetVersionInfo>().Select ();
111- if (rowset.IsReady ()) {
112- while (!rowset.EndOfSet ()) {
113- const ui32 id = rowset.GetValue <Schema::SchemaPresetVersionInfo::Id>();
114- NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
115- Y_ABORT_UNLESS (info.ParseFromString (rowset.GetValue <Schema::SchemaPresetVersionInfo::InfoProto>()));
116- if (info.HasSchema ()) {
117- ui64 version = info.GetSchema ().GetVersion ();
118- if (!maxVersion.has_value () || (version > *maxVersion)) {
119- maxVersion = version;
120- }
121- if (!usedSchemaVersions.contains (version)) {
122- unusedSchemaIds.emplace_back (id, rowset.GetValue <Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue <Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
123- }
124- }
125-
126- if (!rowset.Next ()) {
127- return std::nullopt ;
128- }
129- }
130- } else {
131- return std::nullopt ;
132- }
133- }
134-
135- {
136- auto rowset = db.Table <Schema::TableVersionInfo>().Select ();
137131 if (!rowset.IsReady ()) {
138132 return std::nullopt ;
139133 }
140-
141134 while (!rowset.EndOfSet ()) {
142- const ui64 pathId = rowset.GetValue <Schema::TableVersionInfo::PathId>();
143-
144- NKikimrTxColumnShard::TTableVersionInfo versionInfo;
145- Y_ABORT_UNLESS (versionInfo.ParseFromString (rowset.GetValue <Schema::TableVersionInfo::InfoProto>()));
146- if (versionInfo.HasSchema ()) {
147- ui64 version = versionInfo.GetSchema ().GetVersion ();
148- if (!usedSchemaVersions.contains (version)) {
149- unusedTableSchemaIds.emplace_back (pathId, rowset.GetValue <Schema::TableVersionInfo::SinceStep>(), rowset.GetValue <Schema::TableVersionInfo::SinceTxId>(), version);
150- }
135+ const ui32 id = rowset.GetValue <Schema::SchemaPresetVersionInfo::Id>();
136+ NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
137+ Y_ABORT_UNLESS (info.ParseFromString (rowset.GetValue <Schema::SchemaPresetVersionInfo::InfoProto>()));
138+ AFL_VERIFY (info.HasSchema ());
139+ ui64 version = info.GetSchema ().GetVersion ();
140+ TKey presetVersionKey (id, rowset.GetValue <Schema::SchemaPresetVersionInfo::SinceStep>(),
141+ rowset.GetValue <Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
142+ auto it = maxByPresetId.find (id);
143+ if (it == maxByPresetId.end ()) {
144+ it = maxByPresetId.emplace (id, presetVersionKey).first ;
145+ } else if (it->second < presetVersionKey) {
146+ it->second = presetVersionKey;
151147 }
148+ AFL_VERIFY (schemaIdUsability.emplace (presetVersionKey, usedSchemaVersions.contains (version)).second );
152149
153150 if (!rowset.Next ()) {
154151 return std::nullopt ;
155152 }
156153 }
157- }
154+ for (auto && i : maxByPresetId) {
155+ auto it = schemaIdUsability.find (i.second );
156+ AFL_VERIFY (it != schemaIdUsability.end ());
157+ AFL_VERIFY (it->first == i.second );
158+ it->second = true ;
159+ }
160+ {
161+ auto rowset = db.Table <Schema::TableVersionInfo>().Select ();
162+ if (!rowset.IsReady ()) {
163+ return std::nullopt ;
164+ }
165+
166+ while (!rowset.EndOfSet ()) {
167+ const ui64 pathId = rowset.GetValue <Schema::TableVersionInfo::PathId>();
168+
169+ NKikimrTxColumnShard::TTableVersionInfo versionInfo;
170+ Y_ABORT_UNLESS (versionInfo.ParseFromString (rowset.GetValue <Schema::TableVersionInfo::InfoProto>()));
171+ auto it = schemaIdUsability.find (TKey (versionInfo.GetSchemaPresetId (),
172+ rowset.GetValue <Schema::TableVersionInfo::SinceStep>(), rowset.GetValue <Schema::TableVersionInfo::SinceTxId>(), {}));
173+ AFL_VERIFY (it != schemaIdUsability.end ());
174+ if (!it->second ) {
175+ unusedTableSchemaIds.emplace_back (pathId, rowset.GetValue <Schema::TableVersionInfo::SinceStep>(),
176+ rowset.GetValue <Schema::TableVersionInfo::SinceTxId>());
177+ }
158178
159- std::vector<TTableKey> tablePortion;
160- std::vector<TKey> portion;
161- tablePortion.reserve (10000 );
162- portion.reserve (10000 );
163- auto addPortion = [&]() {
164- if (portion.size () + tablePortion.size () >= 10000 ) {
165- changes.emplace_back (std::make_shared<TNormalizerResult>(std::move (portion), std::move (tablePortion)));
166- portion = std::vector<TKey>();
167- tablePortion = std::vector<TTableKey>();
179+ if (!rowset.Next ()) {
180+ return std::nullopt ;
181+ }
182+ }
168183 }
169- };
170- for (const auto & id: unusedSchemaIds) {
171- if (!maxVersion.has_value () || (id.Version != *maxVersion)) {
172- portion.push_back (id);
173- addPortion ();
184+
185+ std::vector<TTableKey> tableVersionToRemove;
186+ std::vector<TKey> presetVersionsToRemove;
187+ auto addNormalizationTask = [&](const ui32 limit) {
188+ if (presetVersionsToRemove.size () + tableVersionToRemove.size () > limit) {
189+ changes.emplace_back (
190+ std::make_shared<TNormalizerResult>(std::move (presetVersionsToRemove), std::move (tableVersionToRemove)));
191+ presetVersionsToRemove = std::vector<TKey>();
192+ tableVersionToRemove = std::vector<TTableKey>();
193+ }
194+ };
195+ for (const auto & id : schemaIdUsability) {
196+ if (!id.second ) {
197+ presetVersionsToRemove.push_back (id.first );
198+ addNormalizationTask (10000 );
199+ }
174200 }
175- }
176201
177- for (const auto & id: unusedTableSchemaIds) {
178- if (!maxVersion.has_value () || (id.Version != *maxVersion)) {
179- tablePortion.push_back (id);
180- addPortion ();
202+ for (const auto & id : unusedTableSchemaIds) {
203+ tableVersionToRemove.push_back (id);
204+ addNormalizationTask (10000 );
181205 }
182- }
183206
184- if (portion. size () + tablePortion. size () > 0 ) {
185- changes. emplace_back (std::make_shared<TNormalizerResult>( std::move (portion), std::move (tablePortion))) ;
207+ addNormalizationTask ( 0 );
208+ return changes ;
186209 }
187- return changes;
188210 }
189211};
190212
191- TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit (const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
213+ TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit (
214+ const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
192215 auto changes = TNormalizerResult::Init (txc);
193216 if (!changes) {
194- return TConclusionStatus::Fail (" Not ready" );;
217+ return TConclusionStatus::Fail (" Not ready" );
195218 }
196219 std::vector<INormalizerTask::TPtr> tasks;
197220 for (auto && c : *changes) {
@@ -200,4 +223,4 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit
200223 return tasks;
201224}
202225
203- }
226+ } // namespace NKikimr::NOlap
0 commit comments