Skip to content

Commit 3f85407

Browse files
Merge 4a6f715 into 438ee8d
2 parents 438ee8d + 4a6f715 commit 3f85407

File tree

4 files changed

+181
-1
lines changed

4 files changed

+181
-1
lines changed

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ namespace NKikimr::NOlap {
5252

5353
enum class ENormalizerSequentialId : ui32 {
5454
Granules = 1,
55+
CleanGranuleId,
5556
Chunks,
5657
PortionsCleaner,
5758
TablesCleaner,
58-
PortionsMetadata,
59+
PortionsMetadata
5960
};
6061

6162
class TNormalizationContext {
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#include "clean_granule.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
5+
namespace NKikimr::NOlap {
6+
7+
namespace {
8+
9+
struct TChunkData {
10+
ui64 Index = 0;
11+
ui64 GranuleId = 0;
12+
ui64 PlanStep = 0;
13+
ui64 TxId = 0;
14+
ui64 PortionId = 0;
15+
ui32 Chunk = 0;
16+
ui64 ColumnIdx = 0;
17+
18+
ui64 XPlanStep = 0;
19+
ui64 XTxId = 0;
20+
TString Blob;
21+
TString Metadata;
22+
ui64 Offset;
23+
ui32 Size;
24+
ui64 PathId;
25+
26+
template <class TRowSet>
27+
TChunkData(const TRowSet& rowset) {
28+
using Schema = NColumnShard::Schema;
29+
PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>();
30+
TxId = rowset.template GetValue<Schema::IndexColumns::TxId>();
31+
PortionId = rowset.template GetValue<Schema::IndexColumns::Portion>();
32+
GranuleId = rowset.template GetValue<Schema::IndexColumns::Granule>();
33+
Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>();
34+
Index = rowset.template GetValue<Schema::IndexColumns::Index>();
35+
ColumnIdx = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>();
36+
37+
XPlanStep = rowset.template GetValue<Schema::IndexColumns::XPlanStep>();
38+
XTxId = rowset.template GetValue<Schema::IndexColumns::XTxId>();
39+
Blob = rowset.template GetValue<Schema::IndexColumns::Blob>();
40+
Metadata = rowset.template GetValue<Schema::IndexColumns::Metadata>();
41+
Offset = rowset.template GetValue<Schema::IndexColumns::Offset>();
42+
Size = rowset.template GetValue<Schema::IndexColumns::Size>();
43+
PathId = rowset.template GetValue<Schema::IndexColumns::PathId>();
44+
}
45+
};
46+
}
47+
48+
class TCleanGranuleIdNormalizer::TNormalizerResult : public INormalizerChanges {
49+
private:
50+
std::vector<TChunkData> Chunks;
51+
52+
void AddChunk(TChunkData&& chunk) {
53+
Chunks.push_back(std::move(chunk));
54+
}
55+
56+
TNormalizerResult() = default;
57+
58+
public:
59+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
60+
using Schema = NColumnShard::Schema;
61+
NIceDb::TNiceDb db(txc.DB);
62+
ACFL_INFO("normalizer", "TCleanGranuleIdNormalizer")("message", TStringBuilder() << "apply " << Chunks.size() << " chunks");
63+
64+
for (auto&& key : Chunks) {
65+
db.Table<Schema::IndexColumns>().Key(key.Index, key.GranuleId, key.ColumnIdx,
66+
key.PlanStep, key.TxId, key.PortionId, key.Chunk).Delete();
67+
68+
db.Table<Schema::IndexColumns>().Key(0, 0, key.ColumnIdx,
69+
key.PlanStep, key.TxId, key.PortionId, key.Chunk).Update(
70+
NIceDb::TUpdate<Schema::IndexColumns::PathId>(key.PathId),
71+
NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob),
72+
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
73+
NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset),
74+
NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size),
75+
NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep),
76+
NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId)
77+
78+
);
79+
}
80+
return true;
81+
}
82+
83+
ui64 GetSize() const override {
84+
return Chunks.size();
85+
}
86+
87+
static std::optional<std::vector<INormalizerChanges::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
88+
using namespace NColumnShard;
89+
NIceDb::TNiceDb db(txc.DB);
90+
91+
bool ready = true;
92+
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
93+
if (!ready) {
94+
return std::nullopt;
95+
}
96+
97+
std::vector<INormalizerChanges::TPtr> tasks;
98+
ui64 fullChunksCount = 0;
99+
THashSet<ui64> portionIds;
100+
{
101+
auto rowset = db.Table<Schema::IndexColumns>().Select();
102+
if (!rowset.IsReady()) {
103+
return std::nullopt;
104+
}
105+
std::shared_ptr<TNormalizerResult> changes(new TNormalizerResult());
106+
ui64 chunksCount = 0;
107+
108+
while (!rowset.EndOfSet()) {
109+
if (rowset.GetValue<Schema::IndexColumns::Granule>() || rowset.GetValue<Schema::IndexColumns::Index>()) {
110+
AFL_VERIFY(portionIds.emplace(rowset.GetValue<Schema::IndexColumns::Portion>()).second);
111+
TChunkData key(rowset);
112+
113+
changes->AddChunk(std::move(key));
114+
++chunksCount;
115+
++fullChunksCount;
116+
117+
if (chunksCount == 10000) {
118+
tasks.emplace_back(changes);
119+
changes.reset(new TNormalizerResult());
120+
controller.GetCounters().CountObjects(chunksCount);
121+
chunksCount = 0;
122+
}
123+
}
124+
125+
if (!rowset.Next()) {
126+
return std::nullopt;
127+
}
128+
}
129+
130+
if (chunksCount > 0) {
131+
tasks.emplace_back(changes);
132+
controller.GetCounters().CountObjects(chunksCount);
133+
}
134+
}
135+
ACFL_INFO("normalizer", "TCleanGranuleIdNormalizer")("message", TStringBuilder() << fullChunksCount << " chunks found");
136+
return tasks;
137+
}
138+
139+
};
140+
141+
TConclusion<std::vector<INormalizerTask::TPtr>> TCleanGranuleIdNormalizer::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
142+
auto changes = TNormalizerResult::Init(controller, txc);
143+
if (!changes) {
144+
return TConclusionStatus::Fail("Not ready");;
145+
}
146+
std::vector<INormalizerTask::TPtr> tasks;
147+
for (auto&& c : *changes) {
148+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
149+
}
150+
return tasks;
151+
}
152+
153+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
7+
namespace NKikimr::NOlap {
8+
9+
class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent {
10+
class TNormalizerResult;
11+
12+
static inline INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer> Registrator =
13+
INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer>(ENormalizerSequentialId::CleanGranuleId);
14+
public:
15+
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) {
16+
}
17+
18+
virtual ENormalizerSequentialId GetType() const override {
19+
return ENormalizerSequentialId::CleanGranuleId;
20+
}
21+
22+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
23+
};
24+
25+
}

ydb/core/tx/columnshard/normalizer/granule/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
GLOBAL normalizer.cpp
5+
GLOBAL clean_granule.cpp
56
)
67

78
PEERDIR(

0 commit comments

Comments
 (0)