Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
if (result.IsSuccess()) {
NormalizerTasks = result.DetachResult();
if (!NormalizerTasks.empty()) {
ACFL_WARN("normalizer_controller", Self->NormalizerController.DebugString())("tasks_count", NormalizerTasks.size());
break;
}
NIceDb::TNiceDb db(txc.DB);
Expand Down Expand Up @@ -337,6 +338,7 @@ bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&)
void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard");
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")("details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
Changes->ApplyOnComplete(Self->NormalizerController);
Self->NormalizerController.GetNormalizer()->OnResultReady();
if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) {
Expand Down
15 changes: 9 additions & 6 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ namespace NKikimr::NOlap {
enum class ENormalizerSequentialId : ui32 {
Granules = 1,
Chunks,
PortionsMetadata,
PortionsCleaner,
TablesCleaner
TablesCleaner,
PortionsMetadata,
};

class TNormalizationContext {
Expand Down Expand Up @@ -88,14 +88,18 @@ namespace NKikimr::NOlap {
virtual void ApplyOnComplete(const TNormalizationController& normalizationContext) const {
Y_UNUSED(normalizationContext);
}

virtual ui64 GetSize() const = 0;
};

class TTrivialNormalizerTask : public INormalizerTask {
INormalizerChanges::TPtr Changes;
public:
TTrivialNormalizerTask(const INormalizerChanges::TPtr& changes)
: Changes(changes)
{}
{
AFL_VERIFY(Changes);
}

void Start(const TNormalizationController& /* controller */, const TNormalizationContext& /*nCtx*/) override;
};
Expand Down Expand Up @@ -136,9 +140,8 @@ namespace NKikimr::NOlap {

virtual ENormalizerSequentialId GetType() const = 0;

const TString& GetName() const {
static TString name = ToString(GetType());
return name;
TString GetName() const {
return ToString(GetType());
}

ui32 GetSequentialId() const {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class TGranulesNormalizer::TNormalizerResult : public INormalizerChanges {
return true;
}

ui64 GetSize() const override {
return Chunks.size();
}

static std::optional<std::vector<INormalizerChanges::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class TChunksNormalizer::TNormalizerResult : public INormalizerChanges {
}
return true;
}

ui64 GetSize() const override {
return Chunks.size();
}
};

class TRowsAndBytesChangesTask: public NConveyor::ITask {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/clean.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class TBlobsRemovingResult : public INormalizerChanges {
void ApplyOnComplete(const TNormalizationController& /* normController */) const override {
RemovingAction->OnCompleteTxAfterRemoving(true);
}

ui64 GetSize() const override {
return Portions.size();
}
};

class TBlobsRemovingTask : public INormalizerTask {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/portion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class TPortionsNormalizer::TNormalizerResult : public INormalizerChanges {
}
return true;
}

ui64 GetSize() const override {
return Portions.size();
}
};

bool TPortionsNormalizer::CheckPortion(const NColumnShard::TTablesManager&, const TPortionInfo& portionInfo) const {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/normalizer/tables/normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class TRemovedTablesNormalizer::TNormalizerResult : public INormalizerChanges {
return true;
}

ui64 GetSize() const override {
return PathIds.size();
}

static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PEERDIR(
ydb/core/tx
ydb/core/tx/columnshard
ydb/core/tx/columnshard/test_helper
ydb/core/tx/columnshard/hooks/testing
ydb/core/tx/schemeshard/ut_helpers
ydb/library/yql/public/udf/service/exception_policy
)
Expand Down