Skip to content

Commit 6177751

Browse files
authored
Merge 17dea20 into 45d8eb2
2 parents 45d8eb2 + 17dea20 commit 6177751

File tree

3 files changed

+197
-0
lines changed

3 files changed

+197
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#include "special_cleaner.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
5+
namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {
6+
7+
namespace {
8+
9+
class TChanges: public INormalizerChanges {
10+
public:
11+
TChanges(TDeleteTrashImpl::TKeyBatch&& keys)
12+
: Keys(keys) {
13+
}
14+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override {
15+
using namespace NColumnShard;
16+
NIceDb::TNiceDb db(txc.DB);
17+
for (const auto& k : Keys) {
18+
db.Table<Schema::IndexColumns>().Key(k.Index, k.Granule, k.ColumnIdx, k.PlanStep, k.TxId, k.Portion, k.Chunk).Delete();
19+
}
20+
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")("message", TStringBuilder() << GetSize() << " rows deleted");
21+
return true;
22+
}
23+
24+
ui64 GetSize() const override {
25+
return Keys.size();
26+
}
27+
28+
private:
29+
const TDeleteTrashImpl::TKeyBatch Keys;
30+
};
31+
32+
} //namespace
33+
34+
TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteTrashImpl::DoInit(
35+
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
36+
using namespace NColumnShard;
37+
NIceDb::TNiceDb db(txc.DB);
38+
const size_t MaxBatchSize = 10000;
39+
auto keysToDelete = KeysToDelete(txc, MaxBatchSize);
40+
if (!keysToDelete) {
41+
return TConclusionStatus::Fail("Not ready");
42+
}
43+
ui32 removeCount = 0;
44+
for (auto&& i : *keysToDelete) {
45+
removeCount += i.size();
46+
}
47+
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")(
48+
"message", TStringBuilder() << "found " << removeCount << " rows to delete grouped in " << keysToDelete->size() << " batches");
49+
50+
std::vector<INormalizerTask::TPtr> result;
51+
for (auto&& batch : *keysToDelete) {
52+
AFL_VERIFY(!batch.empty());
53+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(batch))));
54+
}
55+
return result;
56+
}
57+
58+
std::optional<std::vector<TDeleteTrashImpl::TKeyBatch>> TDeleteTrashImpl::KeysToDelete(
59+
NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize) {
60+
NIceDb::TNiceDb db(txc.DB);
61+
using namespace NColumnShard;
62+
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
63+
return std::nullopt;
64+
}
65+
const std::set<ui64> columnIdsToDelete = GetColumnIdsToDelete();
66+
std::vector<TKeyBatch> result;
67+
TKeyBatch currentBatch;
68+
auto rowset =
69+
db.Table<Schema::IndexColumns>()
70+
.Select<Schema::IndexColumns::Index, Schema::IndexColumns::Granule, Schema::IndexColumns::ColumnIdx, Schema::IndexColumns::PlanStep,
71+
Schema::IndexColumns::TxId, Schema::IndexColumns::Portion, Schema::IndexColumns::Chunk>();
72+
if (!rowset.IsReady()) {
73+
return std::nullopt;
74+
}
75+
while (!rowset.EndOfSet()) {
76+
if (columnIdsToDelete.contains(rowset.GetValue<Schema::IndexColumns::ColumnIdx>())) {
77+
auto key = TKey{
78+
.Index = rowset.GetValue<Schema::IndexColumns::Index>(),
79+
.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(),
80+
.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(),
81+
.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(),
82+
.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(),
83+
.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(),
84+
.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>() };
85+
currentBatch.emplace_back(std::move(key));
86+
if (currentBatch.size() == maxBatchSize) {
87+
result.emplace_back(std::move(currentBatch));
88+
currentBatch = TKeyBatch{};
89+
}
90+
}
91+
if (!rowset.Next()) {
92+
return std::nullopt;
93+
}
94+
}
95+
if (!currentBatch.empty()) {
96+
result.emplace_back(std::move(currentBatch));
97+
}
98+
99+
return result;
100+
}
101+
102+
} // namespace NKikimr::NOlap::NNormalizer::NSpecialColumns
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
4+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
5+
6+
namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {
7+
8+
class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent {
9+
public:
10+
struct TKey {
11+
ui32 Index;
12+
ui64 Granule;
13+
ui32 ColumnIdx;
14+
ui64 PlanStep;
15+
ui64 TxId;
16+
ui64 Portion;
17+
ui32 Chunk;
18+
};
19+
20+
using TKeyBatch = std::vector<TKey>;
21+
22+
private:
23+
24+
std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize);
25+
26+
virtual std::set<ui64> GetColumnIdsToDelete() const = 0;
27+
28+
public:
29+
TDeleteTrashImpl(const TNormalizationController::TInitContext&) {
30+
}
31+
32+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
33+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
34+
};
35+
36+
class TRemoveDeleteFlag: public TDeleteTrashImpl {
37+
private:
38+
using TBase = TDeleteTrashImpl;
39+
public:
40+
static TString GetClassNameStatic() {
41+
return "RemoveDeleteFlag";
42+
}
43+
44+
private:
45+
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveDeleteFlag>(GetClassNameStatic());
46+
47+
virtual std::set<ui64> GetColumnIdsToDelete() const override {
48+
return { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
49+
}
50+
51+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
52+
return {};
53+
}
54+
virtual TString GetClassName() const override {
55+
return GetClassNameStatic();
56+
}
57+
58+
public:
59+
TRemoveDeleteFlag(const TNormalizationController::TInitContext& context)
60+
: TBase(context) {
61+
}
62+
};
63+
64+
class TRemoveWriteId: public TDeleteTrashImpl {
65+
private:
66+
using TBase = TDeleteTrashImpl;
67+
68+
public:
69+
static TString GetClassNameStatic() {
70+
return "RemoveWriteId";
71+
}
72+
73+
private:
74+
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveWriteId>(GetClassNameStatic());
75+
76+
virtual std::set<ui64> GetColumnIdsToDelete() const override {
77+
return { NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX };
78+
}
79+
80+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
81+
return {};
82+
}
83+
virtual TString GetClassName() const override {
84+
return GetClassNameStatic();
85+
}
86+
87+
public:
88+
TRemoveWriteId(const TNormalizationController::TInitContext& context)
89+
: TBase(context)
90+
{
91+
}
92+
};
93+
94+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/normalizer/portion/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SRCS(
77
GLOBAL clean.cpp
88
GLOBAL clean_empty.cpp
99
GLOBAL broken_blobs.cpp
10+
GLOBAL special_cleaner.cpp
1011
)
1112

1213
PEERDIR(

0 commit comments

Comments
 (0)