Skip to content

Commit 0fa63f2

Browse files
nsofyansofya
authored andcommitted
Fix memory usage for heavy normalizers (ydb-platform#2204)
Co-authored-by: nsofya <nsofya@yandex.ru>
1 parent f9dd1ed commit 0fa63f2

File tree

4 files changed

+23
-18
lines changed

4 files changed

+23
-18
lines changed

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ namespace NKikimr::NOlap {
4949
class TNormalizationContext {
5050
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
5151
YDB_ACCESSOR_DEF(TActorId, ColumnshardActor);
52+
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
53+
public:
54+
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
55+
ResourcesGuard = rg;
56+
}
5257
};
5358

5459
class TNormalizationController;
@@ -117,7 +122,7 @@ namespace NKikimr::NOlap {
117122
TString DebugString() const {
118123
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
119124
<< ";current_normalizer_idx=" << CurrentNormalizerIndex
120-
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size()) ? Normalizers[CurrentNormalizerIndex]->GetName() : "";
125+
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size() ? Normalizers[CurrentNormalizerIndex]->GetName() : "");
121126
}
122127

123128
const INormalizerComponent::TPtr& GetNormalizer() const;

ydb/core/tx/columnshard/normalizer/portion/chunks.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
6868
}
6969

7070
public:
71-
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
71+
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>)
7272
: Blobs(std::move(blobs))
7373
, Chunks(std::move(chunks))
7474
, NormContext(nCtx)

ydb/core/tx/columnshard/normalizer/portion/min_max.cpp

+9-9
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
1616
private:
1717
THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
1818
TDataContainer Portions;
19-
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
19+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
2020
TNormalizationContext NormContext;
2121
protected:
2222
virtual bool DoExecute() override {
23-
Y_ABORT_UNLESS(!Schemas.empty());
24-
auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
23+
Y_ABORT_UNLESS(!Schemas->empty());
24+
auto pkColumnIds = Schemas->begin()->second->GetPkColumnsIds();
2525
pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());
2626

2727
for (auto&& portionInfo : Portions) {
28-
auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
28+
auto blobSchema = Schemas->FindPtr(portionInfo->GetPortionId());
2929
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
3030
for (auto&& i : portionInfo->Records) {
3131
auto blobIt = Blobs.find(i.BlobRange);
@@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
4747
}
4848

4949
public:
50-
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
50+
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
5151
: Blobs(std::move(blobs))
5252
, Portions(std::move(portions))
53-
, Schemas(std::move(schemas))
53+
, Schemas(schemas)
5454
, NormContext(nCtx)
5555
{}
5656

@@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
135135
}
136136

137137
THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
138-
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
138+
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
139139
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
140140

141141
{
@@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
161161
auto portionMeta = loadContext.GetPortionMeta();
162162
if (it == portions.end()) {
163163
Y_ABORT_UNLESS(portion.Records.empty());
164-
schemas[portion.GetPortionId()] = currentSchema;
164+
(*schemas)[portion.GetPortionId()] = currentSchema;
165165
auto portionNew = std::make_shared<TPortionInfo>(portion);
166166
portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
167167
it = portions.emplace(portion.GetPortion(), portionNew).first;
@@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
202202
}
203203
++brokenPortioncCount;
204204
package.emplace_back(portion.second);
205-
if (package.size() == 100) {
205+
if (package.size() == 1000) {
206206
std::vector<std::shared_ptr<TPortionInfo>> local;
207207
local.swap(package);
208208
tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));

ydb/core/tx/columnshard/normalizer/portion/normalizer.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
1818
private:
1919
using TBase = NOlap::NBlobOperations::NRead::ITask;
2020
typename TConveyorTask::TDataContainer Data;
21-
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
21+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
2222
TNormalizationContext NormContext;
2323

2424
public:
25-
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
25+
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
2626
: TBase(actions, "CS::NORMALIZER")
2727
, Data(std::move(data))
2828
, Schemas(std::move(schemas))
@@ -32,8 +32,8 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
3232

3333
protected:
3434
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
35-
Y_UNUSED(resourcesGuard);
36-
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
35+
NormContext.SetResourcesGuard(resourcesGuard);
36+
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), Schemas);
3737
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
3838
}
3939

@@ -49,13 +49,13 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
4949
template <class TConveyorTask>
5050
class TPortionsNormalizerTask : public INormalizerTask {
5151
typename TConveyorTask::TDataContainer Package;
52-
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
52+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
5353
public:
5454
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
5555
: Package(std::move(package))
5656
{}
5757

58-
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
58+
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
5959
: Package(std::move(package))
6060
, Schemas(schemas)
6161
{}
@@ -71,7 +71,7 @@ class TPortionsNormalizerTask : public INormalizerTask {
7171
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
7272
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
7373
nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
74-
std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
74+
std::make_shared<TReadPortionsTask<TConveyorTask>>(nCtx, actions, std::move(Package), Schemas), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
7575
}
7676
};
7777
}

0 commit comments

Comments
 (0)