Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
TablesCleaner,
PortionsMetadata,
CleanGranuleId,
EmptyPortionsCleaner,

MAX
};
Expand Down
120 changes: 120 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "clean_empty.h"
#include <ydb/core/tx/columnshard/columnshard_schema.h>


namespace NKikimr::NOlap {

namespace {
std::optional<THashSet<TPortionAddress>> GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
THashSet<TPortionAddress> usedPortions;
auto rowset = db.Table<Schema::IndexColumns>().Select<
Schema::IndexColumns::PathId,
Schema::IndexColumns::Portion
>();
if (!rowset.IsReady()) {
return std::nullopt;
}
while (!rowset.EndOfSet()) {
usedPortions.emplace(
rowset.GetValue<Schema::IndexColumns::PathId>(),
rowset.GetValue<Schema::IndexColumns::Portion>()
);
if (!rowset.Next()) {
return std::nullopt;
}
}
return usedPortions;
}

using TBatch = std::vector<TPortionAddress>;

std::optional<std::vector<TBatch>> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
const auto usedPortions = GetColumnPortionAddresses(txc);
if (!usedPortions) {
return std::nullopt;
}
const size_t MaxBatchSize = 10000;
NIceDb::TNiceDb db(txc.DB);
if (!Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
auto rowset = db.Table<Schema::IndexPortions>().Select<
Schema::IndexPortions::PathId,
Schema::IndexPortions::PortionId
>();
if (!rowset.IsReady()) {
return std::nullopt;
}
std::vector<TBatch> result;
TBatch portionsToDelete;
while (!rowset.EndOfSet()) {
TPortionAddress addr(
rowset.GetValue<Schema::IndexPortions::PathId>(),
rowset.GetValue<Schema::IndexPortions::PortionId>()
);
if (!usedPortions->contains(addr)) {
ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << addr.DebugString() << " marked for deletion");
portionsToDelete.emplace_back(std::move(addr));
if (portionsToDelete.size() == MaxBatchSize) {
result.emplace_back(std::move(portionsToDelete));
portionsToDelete = TBatch{};
}
}
if (!rowset.Next()) {
return std::nullopt;
}
}
if (!portionsToDelete.empty()) {
result.emplace_back(std::move(portionsToDelete));
}
return result;
}

class TChanges : public INormalizerChanges {
public:
TChanges(TBatch&& addresses)
: Addresses(addresses)
{}
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for(const auto& a: Addresses) {
db.Table<Schema::IndexPortions>().Key(
a.GetPathId(),
a.GetPortionId()
).Delete();
}
ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << GetSize() << " portions deleted");
return true;
}

ui64 GetSize() const override {
return Addresses.size();
}
private:
const TBatch Addresses;
};

} //namespace

TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
auto batchesToDelete = GetPortionsToDelete(txc);
if (!batchesToDelete) {
return TConclusionStatus::Fail("Not ready");
}

std::vector<INormalizerTask::TPtr> result;
for (auto&& b: *batchesToDelete) {
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(b))));
}
return result;
}

} //namespace NKikimr::NOlap
28 changes: 28 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/clean_empty.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap {

class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent {

static TString ClassName() {
return ToString(ENormalizerSequentialId::EmptyPortionsCleaner);
}
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanEmptyPortionsNormalizer>(ClassName());
public:
TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&)
{}

std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return ENormalizerSequentialId::EmptyPortionsCleaner;
}

TString GetClassName() const override {
return ClassName();
}

TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

} //namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr::NOlap {

TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
auto initRes = DoInitImpl(controller,txc);
auto initRes = DoInitImpl(controller, txc);

if (initRes.IsFail()) {
return initRes;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/portion/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
GLOBAL portion.cpp
GLOBAL chunks.cpp
GLOBAL clean.cpp
GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
)

Expand Down
24 changes: 22 additions & 2 deletions ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/engines/portions/constructor.h>

#include <ydb/core/tx/columnshard/operations/write_data.h>

Expand Down Expand Up @@ -161,7 +162,7 @@ class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier {
}
};

class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
Expand All @@ -185,6 +186,21 @@ class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
}
};


class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (size_t pathId = 100; pathId != 299; ++pathId) {
for (size_t portionId = 1000; portionId != 1199; ++portionId) {
db.Table<Schema::IndexPortions>().Key(pathId, portionId).Update();
}
}
}
};


class TTablesCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
Expand Down Expand Up @@ -317,7 +333,11 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}

Y_UNIT_TEST(PortionsNormalizer) {
TestNormalizerImpl<TPortinosCleaner>();
TestNormalizerImpl<TPortionsCleaner>();
}

Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
TestNormalizerImpl<TEmptyPortionsCleaner>();
}

Y_UNIT_TEST(EmptyTablesNormalizer) {
Expand Down