Skip to content

Commit 3087911

Browse files
fixes
1 parent 4e93a8c commit 3087911

File tree

4 files changed

+81
-12
lines changed

4 files changed

+81
-12
lines changed

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,10 @@ class TColumnChunkLoadContext {
957957
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());
958958

959959
public:
960+
TPortionAddress GetPortionAddress() const {
961+
return TPortionAddress(PathId, PortionId);
962+
}
963+
960964
const TChunkAddress& GetAddress() const {
961965
return Address;
962966
}

ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include "clean_empty.h"
22

3-
#include <ydb/core/tx/columnshard/columnshard_schema.h>
43
#include <ydb/core/protos/config.pb.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
55

66
namespace NKikimr::NOlap::NSyncChunksWithPortions {
77

@@ -11,13 +11,36 @@ class IDBModifier {
1111
virtual ~IDBModifier() = default;
1212
};
1313

14+
class TRemoveV0: public IDBModifier {
15+
private:
16+
const TPortionAddress PortionAddress;
17+
std::vector<TColumnChunkLoadContext> Chunks;
18+
virtual void Apply(NIceDb::TNiceDb& db) override {
19+
for (auto&& i : Chunks) {
20+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_v0")("path_id", PortionAddress.GetPathId())(
21+
"portion_id", PortionAddress.GetPortionId())("chunk", i.GetAddress().DebugString());
22+
db.Table<NColumnShard::Schema::IndexColumns>()
23+
.Key(0, 0, i.GetAddress().GetColumnId(), i.GetMinSnapshotDeprecated().GetPlanStep(),
24+
i.GetMinSnapshotDeprecated().GetTxId(), PortionAddress.GetPortionId(), i.GetAddress().GetChunkIdx())
25+
.Delete();
26+
}
27+
}
28+
29+
public:
30+
TRemoveV0(const TPortionAddress& portionAddress, const std::vector<TColumnChunkLoadContext>& chunks)
31+
: PortionAddress(portionAddress)
32+
, Chunks(chunks) {
33+
}
34+
};
35+
1436
class TRemoveV1: public IDBModifier {
1537
private:
1638
const TPortionAddress PortionAddress;
1739
std::vector<TChunkAddress> Chunks;
1840
virtual void Apply(NIceDb::TNiceDb& db) override {
1941
for (auto&& i : Chunks) {
20-
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_v1")("path_id", PortionAddress.GetPathId())("portion_id", PortionAddress.GetPortionId());
42+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_v1")("path_id", PortionAddress.GetPathId())(
43+
"portion_id", PortionAddress.GetPortionId())("chunk", i.DebugString());
2144
db.Table<NColumnShard::Schema::IndexColumnsV1>()
2245
.Key(PortionAddress.GetPathId(), PortionAddress.GetPortionId(), i.GetColumnId(), i.GetChunkIdx())
2346
.Delete();
@@ -64,8 +87,9 @@ class TRemovePortion: public IDBModifier {
6487
};
6588

6689
namespace {
67-
bool GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc, std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& resultV1,
68-
std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& resultV2, std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& portions) {
90+
bool GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc, std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& resultV0,
91+
std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& resultV1, std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& resultV2,
92+
std::map<TPortionAddress, std::shared_ptr<IDBModifier>>& portions, NColumnShard::TBlobGroupSelector& dsGroupSelector) {
6993
using namespace NColumnShard;
7094
NIceDb::TNiceDb db(txc.DB);
7195
if (!Schema::Precharge<Schema::IndexColumnsV1>(db, txc.DB.GetScheme())) {
@@ -77,7 +101,29 @@ bool GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc, st
77101
if (!Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme())) {
78102
return false;
79103
}
104+
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
105+
return false;
106+
}
80107

108+
{
109+
std::map<TPortionAddress, std::vector<TColumnChunkLoadContext>> usedPortions;
110+
auto rowset = db.Table<Schema::IndexColumns>().Select();
111+
if (!rowset.IsReady()) {
112+
return false;
113+
}
114+
while (!rowset.EndOfSet()) {
115+
TColumnChunkLoadContext chunk(rowset, &dsGroupSelector);
116+
usedPortions[chunk.GetPortionAddress()].emplace_back(chunk);
117+
if (!rowset.Next()) {
118+
return false;
119+
}
120+
}
121+
std::map<TPortionAddress, std::shared_ptr<IDBModifier>> tasks;
122+
for (auto&& i : usedPortions) {
123+
tasks.emplace(i.first, std::make_shared<TRemoveV0>(i.first, i.second));
124+
}
125+
std::swap(resultV0, tasks);
126+
}
81127
{
82128
std::map<TPortionAddress, std::vector<TChunkAddress>> usedPortions;
83129
auto rowset = db.Table<Schema::IndexColumnsV1>()
@@ -175,17 +221,25 @@ class TIterator {
175221
}
176222
};
177223

178-
std::optional<std::vector<std::vector<std::shared_ptr<IDBModifier>>>> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) {
224+
std::optional<std::vector<std::vector<std::shared_ptr<IDBModifier>>>> GetPortionsToDelete(
225+
NTabletFlatExecutor::TTransactionContext& txc, NColumnShard::TBlobGroupSelector& dsGroupSelector) {
179226
using namespace NColumnShard;
227+
std::map<TPortionAddress, std::shared_ptr<IDBModifier>> v0Portions;
180228
std::map<TPortionAddress, std::shared_ptr<IDBModifier>> v1Portions;
181229
std::map<TPortionAddress, std::shared_ptr<IDBModifier>> v2Portions;
182230
std::map<TPortionAddress, std::shared_ptr<IDBModifier>> portions;
183-
if (!GetColumnPortionAddresses(txc, v1Portions, v2Portions, portions)) {
231+
if (!GetColumnPortionAddresses(txc, v0Portions, v1Portions, v2Portions, portions, dsGroupSelector)) {
184232
return std::nullopt;
185233
}
186234
std::vector<TPortionAddress> pack;
187235
std::map<TPortionAddress, std::vector<TIterator>> iteration;
188-
const ui32 SourcesCount = 3;
236+
const bool v0Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage();
237+
const ui32 SourcesCount = v0Usage ? 4 : 3;
238+
if (v0Usage) {
239+
if (v0Portions.size()) {
240+
iteration[v0Portions.begin()->first].emplace_back(v0Portions);
241+
}
242+
}
189243
{
190244
if (v1Portions.size()) {
191245
iteration[v1Portions.begin()->first].emplace_back(v1Portions);
@@ -259,8 +313,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::D
259313
const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
260314
using namespace NColumnShard;
261315
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
262-
AFL_VERIFY(!AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage())("config", AppDataVerified().ColumnShardConfig.DebugString());
263-
auto batchesToDelete = GetPortionsToDelete(txc);
316+
auto batchesToDelete = GetPortionsToDelete(txc, DsGroupSelector);
264317
if (!batchesToDelete) {
265318
return TConclusionStatus::Fail("Not ready");
266319
}

ydb/core/tx/columnshard/normalizer/portion/clean_empty.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormaliz
1010
return "EmptyPortionsCleaner";
1111
}
1212
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanEmptyPortionsNormalizer>(ClassName());
13+
14+
NColumnShard::TBlobGroupSelector DsGroupSelector;
15+
1316
public:
14-
TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&)
15-
{}
17+
TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext& info)
18+
: DsGroupSelector(info.GetStorageInfo()) {
19+
}
1620

1721
std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
1822
return std::nullopt;

ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,15 @@ Y_UNIT_TEST_SUITE(Normalizers) {
308308
}
309309

310310
Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
311-
TestNormalizerImpl<TEmptyPortionsCleaner>();
311+
class TLocalNormalizerChecker: public TNormalizerChecker {
312+
public:
313+
virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override {
314+
auto* repair = columnShardConfig.MutableRepairs()->Add();
315+
repair->SetClassName("EmptyPortionsCleaner");
316+
repair->SetDescription("Removing unsync portions");
317+
}
318+
};
319+
TestNormalizerImpl<TEmptyPortionsCleaner>(TLocalNormalizerChecker());
312320
}
313321

314322

0 commit comments

Comments
 (0)