Skip to content

Commit 6d42f85

Browse files
authored
Merge 653c2cb into b153534
2 parents b153534 + 653c2cb commit 6d42f85

File tree

5 files changed

+194
-0
lines changed

5 files changed

+194
-0
lines changed

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ enum class ENormalizerSequentialId: ui32 {
5959
EmptyPortionsCleaner,
6060
CleanInsertionDedup,
6161
GCCountersNormalizer,
62+
SchemaVersionCleaner,
6263

6364
MAX
6465
};
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#include "version.h"
2+
3+
namespace NKikimr::NOlap {
4+
5+
class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
6+
private:
7+
class TKey {
8+
public:
9+
ui64 Step;
10+
ui64 TxId;
11+
ui64 Version;
12+
ui32 Id;
13+
14+
public:
15+
TKey() = default;
16+
17+
TKey(ui32 id, ui64 step, ui64 txId, ui64 version)
18+
: Step(step)
19+
, TxId(txId)
20+
, Version(version)
21+
, Id(id)
22+
{
23+
}
24+
};
25+
26+
std::vector<TKey> VersionsToRemove;
27+
28+
public:
29+
TNormalizerResult(std::vector<TKey>&& versions)
30+
: VersionsToRemove(versions)
31+
{
32+
}
33+
34+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
35+
using namespace NColumnShard;
36+
NIceDb::TNiceDb db(txc.DB);
37+
for (auto& key: VersionsToRemove) {
38+
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
39+
}
40+
return true;
41+
}
42+
43+
ui64 GetSize() const override {
44+
return VersionsToRemove.size();
45+
}
46+
47+
static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) {
48+
using namespace NColumnShard;
49+
THashSet<ui64> usedSchemaVersions;
50+
NIceDb::TNiceDb db(txc.DB);
51+
{
52+
auto rowset = db.Table<Schema::IndexPortions>().Select();
53+
if (rowset.IsReady()) {
54+
while (!rowset.EndOfSet()) {
55+
usedSchemaVersions.insert(rowset.GetValue<Schema::IndexPortions::SchemaVersion>());
56+
if (!rowset.Next()) {
57+
return std::nullopt;
58+
}
59+
}
60+
} else {
61+
return std::nullopt;
62+
}
63+
}
64+
{
65+
auto rowset = db.Table<Schema::InsertTable>().Select();
66+
if (rowset.IsReady()) {
67+
while (!rowset.EndOfSet()) {
68+
if (rowset.HaveValue<Schema::InsertTable::SchemaVersion>()) {
69+
usedSchemaVersions.insert(rowset.GetValue<Schema::InsertTable::SchemaVersion>());
70+
if (!rowset.Next()) {
71+
return std::nullopt;
72+
}
73+
}
74+
}
75+
} else {
76+
return std::nullopt;
77+
}
78+
}
79+
80+
std::vector<TKey> unusedSchemaIds;
81+
std::optional<ui64> maxVersion;
82+
std::vector<INormalizerChanges::TPtr> changes;
83+
84+
{
85+
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
86+
if (rowset.IsReady()) {
87+
while (!rowset.EndOfSet()) {
88+
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
89+
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
90+
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
91+
if (info.HasSchema()) {
92+
ui64 version = info.GetSchema().GetVersion();
93+
if (!maxVersion.has_value() || (version > *maxVersion)) {
94+
maxVersion = version;
95+
}
96+
if (!usedSchemaVersions.contains(version)) {
97+
unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
98+
}
99+
}
100+
101+
if (!rowset.Next()) {
102+
return std::nullopt;
103+
}
104+
}
105+
} else {
106+
return std::nullopt;
107+
}
108+
}
109+
110+
std::vector<TKey> portion;
111+
portion.reserve(10000);
112+
for (auto iter = unusedSchemaIds.begin(); iter != unusedSchemaIds.end(); iter++) {
113+
if (!maxVersion.has_value() || (iter->Version != *maxVersion)) {
114+
portion.push_back(*iter);
115+
if (portion.size() >= 10000) {
116+
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
117+
}
118+
}
119+
}
120+
if (portion.size() > 0) {
121+
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
122+
}
123+
return changes;
124+
}
125+
};
126+
127+
TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
128+
auto changes = TNormalizerResult::Init(txc);
129+
if (!changes) {
130+
return TConclusionStatus::Fail("Not ready");;
131+
}
132+
std::vector<INormalizerTask::TPtr> tasks;
133+
for (auto&& c : *changes) {
134+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
135+
}
136+
return tasks;
137+
}
138+
139+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
#include <ydb/core/tx/columnshard/defs.h>
6+
7+
8+
namespace NKikimr::NColumnShard {
9+
class TTablesManager;
10+
}
11+
12+
namespace NKikimr::NOlap {
13+
14+
class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent {
15+
public:
16+
static TString GetClassNameStatic() {
17+
return ::ToString(ENormalizerSequentialId::SchemaVersionCleaner);
18+
}
19+
20+
private:
21+
static inline TFactory::TRegistrator<TSchemaVersionNormalizer> Registrator = TFactory::TRegistrator<TSchemaVersionNormalizer>(
22+
GetClassNameStatic());
23+
public:
24+
class TNormalizerResult;
25+
class TTask;
26+
27+
public:
28+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
29+
return ENormalizerSequentialId::SchemaVersionCleaner;
30+
}
31+
32+
virtual TString GetClassName() const override {
33+
return GetClassNameStatic();
34+
}
35+
36+
TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) {
37+
}
38+
39+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
40+
};
41+
42+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
GLOBAL version.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/tx/columnshard/normalizer/abstract
9+
)
10+
11+
END()

ydb/core/tx/columnshard/normalizer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ PEERDIR(
66
ydb/core/tx/columnshard/normalizer/tablet
77
ydb/core/tx/columnshard/normalizer/tables
88
ydb/core/tx/columnshard/normalizer/portion
9+
ydb/core/tx/columnshard/normalizer/schema_version
910
ydb/core/tx/columnshard/normalizer/insert_table
1011
)
1112

0 commit comments

Comments
 (0)