Skip to content

Commit 00441c6

Browse files
authored
Merge 852f18a into ea324a3
2 parents ea324a3 + 852f18a commit 00441c6

File tree

5 files changed

+191
-0
lines changed

5 files changed

+191
-0
lines changed

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ enum class ENormalizerSequentialId: ui32 {
5959
EmptyPortionsCleaner,
6060
CleanInsertionDedup,
6161
GCCountersNormalizer,
62+
SchemaVersionCleaner,
6263

6364
MAX
6465
};
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
#include "version.h"
2+
3+
namespace NKikimr::NOlap {
4+
5+
class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
6+
private:
7+
class TKey {
8+
public:
9+
ui64 Step;
10+
ui64 TxId;
11+
ui64 Version;
12+
ui32 Id;
13+
14+
public:
15+
TKey() = default;
16+
17+
TKey(ui32 id, ui64 step, ui64 txId, ui64 version)
18+
: Step(step)
19+
, TxId(txId)
20+
, Version(version)
21+
, Id(id)
22+
{
23+
}
24+
};
25+
26+
std::vector<TKey> VersionsToRemove;
27+
std::optional<ui64> LastVersion;
28+
29+
public:
30+
TNormalizerResult(std::vector<TKey>&& versions, std::optional<ui64>& lastVersion)
31+
: VersionsToRemove(versions)
32+
, LastVersion(lastVersion)
33+
{
34+
}
35+
36+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
37+
using namespace NColumnShard;
38+
NIceDb::TNiceDb db(txc.DB);
39+
for (auto& key: VersionsToRemove) {
40+
if ((!LastVersion.has_value()) || (key.Version != *LastVersion)) {
41+
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
42+
}
43+
}
44+
return true;
45+
}
46+
47+
ui64 GetSize() const override {
48+
return VersionsToRemove.size();
49+
}
50+
51+
static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) {
52+
using namespace NColumnShard;
53+
THashSet<ui64> usedSchemaVersions;
54+
NIceDb::TNiceDb db(txc.DB);
55+
{
56+
auto rowset = db.Table<Schema::IndexPortions>().Select();
57+
if (rowset.IsReady()) {
58+
while (!rowset.EndOfSet()) {
59+
usedSchemaVersions.insert(rowset.GetValue<Schema::IndexPortions::SchemaVersion>());
60+
if (!rowset.Next()) {
61+
return std::nullopt;
62+
}
63+
}
64+
} else {
65+
return std::nullopt;
66+
}
67+
}
68+
{
69+
auto rowset = db.Table<Schema::InsertTable>().Select();
70+
if (rowset.IsReady()) {
71+
while (!rowset.EndOfSet()) {
72+
if (rowset.HaveValue<Schema::InsertTable::SchemaVersion>()) {
73+
usedSchemaVersions.insert(rowset.GetValue<Schema::InsertTable::SchemaVersion>());
74+
if (!rowset.Next()) {
75+
return std::nullopt;
76+
}
77+
}
78+
}
79+
} else {
80+
return std::nullopt;
81+
}
82+
}
83+
84+
std::vector<TKey> unusedSchemaIds;
85+
std::optional<ui64> maxVersion;
86+
std::vector<INormalizerChanges::TPtr> changes;
87+
88+
{
89+
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
90+
if (rowset.IsReady()) {
91+
while (!rowset.EndOfSet()) {
92+
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
93+
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
94+
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
95+
if (info.HasSchema()) {
96+
ui64 version = info.GetSchema().GetVersion();
97+
if (!maxVersion.has_value() || (version > *maxVersion)) {
98+
maxVersion = version;
99+
}
100+
if (!usedSchemaVersions.contains(version)) {
101+
unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
102+
}
103+
}
104+
105+
if (!rowset.Next()) {
106+
return std::nullopt;
107+
}
108+
}
109+
} else {
110+
return std::nullopt;
111+
}
112+
}
113+
114+
for (size_t start = 0; start < unusedSchemaIds.size(); start += 10000) {
115+
std::vector<TKey> portion;
116+
size_t end = std::min(unusedSchemaIds.size(), start + 10000);
117+
portion.insert(portion.begin(), &unusedSchemaIds[start], &unusedSchemaIds[end]);
118+
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion), maxVersion));
119+
}
120+
return changes;
121+
}
122+
};
123+
124+
TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
125+
auto changes = TNormalizerResult::Init(txc);
126+
if (!changes) {
127+
return TConclusionStatus::Fail("Not ready");;
128+
}
129+
std::vector<INormalizerTask::TPtr> tasks;
130+
for (auto&& c : *changes) {
131+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
132+
}
133+
return tasks;
134+
}
135+
136+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
#include <ydb/core/tx/columnshard/defs.h>
6+
7+
8+
namespace NKikimr::NColumnShard {
9+
class TTablesManager;
10+
}
11+
12+
namespace NKikimr::NOlap {
13+
14+
class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent {
15+
public:
16+
static TString GetClassNameStatic() {
17+
return ::ToString(ENormalizerSequentialId::SchemaVersionCleaner);
18+
}
19+
20+
private:
21+
static inline TFactory::TRegistrator<TSchemaVersionNormalizer> Registrator = TFactory::TRegistrator<TSchemaVersionNormalizer>(
22+
GetClassNameStatic());
23+
public:
24+
class TNormalizerResult;
25+
class TTask;
26+
27+
public:
28+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
29+
return ENormalizerSequentialId::SchemaVersionCleaner;
30+
}
31+
32+
virtual TString GetClassName() const override {
33+
return GetClassNameStatic();
34+
}
35+
36+
TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) {
37+
}
38+
39+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
40+
};
41+
42+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
GLOBAL version.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/tx/columnshard/normalizer/abstract
9+
)
10+
11+
END()

ydb/core/tx/columnshard/normalizer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ PEERDIR(
66
ydb/core/tx/columnshard/normalizer/tablet
77
ydb/core/tx/columnshard/normalizer/tables
88
ydb/core/tx/columnshard/normalizer/portion
9+
ydb/core/tx/columnshard/normalizer/schema_version
910
ydb/core/tx/columnshard/normalizer/insert_table
1011
)
1112

0 commit comments

Comments
 (0)