Skip to content

Commit 481ccad

Browse files
Leak bs normalizer (#11682)
1 parent c5d16f6 commit 481ccad

23 files changed

+608
-180
lines changed

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
258258
}
259259
}
260260
{
261-
NOlap::TNormalizationController::TInitContext initCtx(Self->Info());
261+
NOlap::TNormalizationController::TInitContext initCtx(Self->Info(), Self->TabletID(), Self->SelfId());
262262
Self->NormalizerController.InitNormalizers(initCtx);
263263
}
264264

ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
namespace NKikimr::NOlap {
88

99
TNormalizationController::INormalizerComponent::TPtr TNormalizationController::RegisterNormalizer(INormalizerComponent::TPtr normalizer) {
10-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString());
1110
AFL_VERIFY(normalizer);
11+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString());
1212
Counters.emplace_back(normalizer->GetClassName());
1313
Normalizers.emplace_back(normalizer);
1414
return normalizer;
@@ -74,7 +74,9 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
7474
if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) {
7575
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription());
7676
} else {
77-
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx)));
77+
auto component = INormalizerComponent::TFactory::MakeHolder(i.GetClassName(), ctx);
78+
AFL_VERIFY(component)("class_name", i.GetClassName());
79+
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(component.Release()));
7880
normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription());
7981
}
8082
}

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
#pragma once
22

33
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
4-
#include <ydb/library/accessor/accessor.h>
5-
64
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
75
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
6+
7+
#include <ydb/library/accessor/accessor.h>
88
#include <ydb/library/conclusion/result.h>
9+
910
#include <library/cpp/object_factory/object_factory.h>
1011

1112
namespace NKikimr::NIceDb {
12-
class TNiceDb;
13+
class TNiceDb;
1314
}
1415

1516
namespace NKikimr::NOlap {
@@ -21,6 +22,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner {
2122
NMonitoring::TDynamicCounters::TCounterPtr StartedCount;
2223
NMonitoring::TDynamicCounters::TCounterPtr FinishedCount;
2324
NMonitoring::TDynamicCounters::TCounterPtr FailedCount;
25+
2426
public:
2527
TNormalizerCounters(const TString& normalizerName)
2628
: TBase("Normalizer") {
@@ -49,7 +51,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner {
4951
}
5052
};
5153

52-
enum class ENormalizerSequentialId: ui32 {
54+
enum class ENormalizerSequentialId : ui32 {
5355
Granules = 1,
5456
Chunks,
5557
DeprecatedPortionsCleaner,
@@ -74,27 +76,29 @@ class TNormalizationContext {
7476
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
7577
YDB_ACCESSOR_DEF(TActorId, ShardActor);
7678
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
79+
7780
public:
7881
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
7982
ResourcesGuard = rg;
8083
}
8184
};
8285

83-
8486
class TNormalizationController;
8587

8688
class INormalizerTask {
8789
public:
8890
using TPtr = std::shared_ptr<INormalizerTask>;
89-
virtual ~INormalizerTask() {}
91+
virtual ~INormalizerTask() {
92+
}
9093

9194
virtual void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) = 0;
9295
};
9396

9497
class INormalizerChanges {
9598
public:
9699
using TPtr = std::shared_ptr<INormalizerChanges>;
97-
virtual ~INormalizerChanges() {}
100+
virtual ~INormalizerChanges() {
101+
}
98102

99103
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& normalizationContext) const = 0;
100104
virtual void ApplyOnComplete(const TNormalizationController& normalizationContext) const {
@@ -106,6 +110,7 @@ class INormalizerChanges {
106110

107111
class TTrivialNormalizerTask: public INormalizerTask {
108112
INormalizerChanges::TPtr Changes;
113+
109114
public:
110115
TTrivialNormalizerTask(const INormalizerChanges::TPtr& changes)
111116
: Changes(changes) {
@@ -118,10 +123,24 @@ class TTrivialNormalizerTask: public INormalizerTask {
118123
class TNormalizationController {
119124
public:
120125
class TInitContext {
126+
private:
121127
TIntrusiveConstPtr<TTabletStorageInfo> StorageInfo;
128+
const ui64 TabletId;
129+
const NActors::TActorId TabletActorId;
130+
122131
public:
123-
TInitContext(TTabletStorageInfo* info)
124-
: StorageInfo(info) {
132+
TInitContext(TTabletStorageInfo* info, const ui64 tabletId, const NActors::TActorId& actorId)
133+
: StorageInfo(info)
134+
, TabletId(tabletId)
135+
, TabletActorId(actorId) {
136+
}
137+
138+
ui64 GetTabletId() const {
139+
return TabletId;
140+
}
141+
142+
const NActors::TActorId& GetTabletActorId() const {
143+
return TabletActorId;
125144
}
126145

127146
TIntrusiveConstPtr<TTabletStorageInfo> GetStorageInfo() const {
@@ -133,6 +152,7 @@ class TNormalizationController {
133152
private:
134153
YDB_READONLY_DEF(TString, ClassName);
135154
YDB_READONLY_DEF(TString, Description);
155+
136156
public:
137157
bool operator<(const TNormalizerFullId& item) const {
138158
if (ClassName == item.ClassName) {
@@ -143,9 +163,7 @@ class TNormalizationController {
143163

144164
TNormalizerFullId(const TString& className, const TString& description)
145165
: ClassName(className)
146-
, Description(description)
147-
{
148-
166+
, Description(description) {
149167
}
150168
};
151169

@@ -154,18 +172,26 @@ class TNormalizationController {
154172
YDB_ACCESSOR(bool, IsRepair, false);
155173
YDB_ACCESSOR_DEF(TString, UniqueDescription);
156174
YDB_ACCESSOR(TString, UniqueId, TGUID::CreateTimebased().AsUuidString());
157-
175+
158176
virtual TString DoDebugString() const {
159177
return "";
160178
}
161179

162180
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const = 0;
163181

182+
protected:
183+
const ui64 TabletId;
184+
const NActors::TActorId TabletActorId;
185+
164186
public:
165187
using TPtr = std::shared_ptr<INormalizerComponent>;
166188
using TFactory = NObjectFactory::TParametrizedObjectFactory<INormalizerComponent, TString, TInitContext>;
167189

168-
virtual ~INormalizerComponent() {}
190+
virtual ~INormalizerComponent() = default;
191+
INormalizerComponent(const TInitContext& context)
192+
: TabletId(context.GetTabletId())
193+
, TabletActorId(context.GetTabletActorId()) {
194+
}
169195

170196
TNormalizerFullId GetNormalizerFullId() const {
171197
return TNormalizerFullId(GetClassName(), UniqueDescription);
@@ -222,10 +248,12 @@ class TNormalizationController {
222248
}
223249
}
224250

225-
TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);
251+
TConclusion<std::vector<INormalizerTask::TPtr>> Init(
252+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);
226253

227254
private:
228-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;
255+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
256+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;
229257

230258
TAtomic ActiveTasksCount = 0;
231259
};
@@ -240,11 +268,13 @@ class TNormalizationController {
240268
std::set<TNormalizerFullId> FinishedNormalizers;
241269
std::map<TNormalizerFullId, TString> StartedNormalizers;
242270
YDB_READONLY_DEF(std::optional<ui32>, LastSavedNormalizerId);
271+
243272
private:
244273
INormalizerComponent::TPtr RegisterNormalizer(INormalizerComponent::TPtr normalizer);
245274

246275
public:
247-
TNormalizationController(std::shared_ptr<IStoragesManager> storagesManager, const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>& counters)
276+
TNormalizationController(std::shared_ptr<IStoragesManager> storagesManager,
277+
const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>& counters)
248278
: StoragesManager(storagesManager)
249279
, TaskSubscription("CS::NORMALIZER", counters) {
250280
}
@@ -265,12 +295,12 @@ class TNormalizationController {
265295

266296
TString DebugString() const {
267297
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
268-
<< ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA");
298+
<< ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA");
269299
}
270300

271301
const INormalizerComponent::TPtr& GetNormalizer() const;
272302
bool IsNormalizationFinished() const;
273303
bool SwitchNormalizer();
274304
const TNormalizerCounters& GetCounters() const;
275305
};
276-
}
306+
} // namespace NKikimr::NOlap
Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
#pragma once
22

3-
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
43
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5-
4+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
65

76
namespace NKikimr::NOlap {
87

98
class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent {
9+
private:
10+
using TBase = TNormalizationController::INormalizerComponent;
11+
1012
public:
1113
static TString GetClassNameStatic() {
1214
return ::ToString(ENormalizerSequentialId::CleanGranuleId);
1315
}
16+
1417
private:
1518
class TNormalizerResult;
1619

1720
static inline INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer> Registrator =
1821
INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer>(GetClassNameStatic());
22+
1923
public:
2024
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
2125
return ENormalizerSequentialId::CleanGranuleId;
@@ -25,10 +29,12 @@ class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerCom
2529
return GetClassNameStatic();
2630
}
2731

28-
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) {
32+
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext& context)
33+
: TBase(context) {
2934
}
3035

31-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
36+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
37+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
3238
};
3339

34-
}
40+
} // namespace NKikimr::NOlap
Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
11
#pragma once
22

3-
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
43
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5-
4+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
65

76
namespace NKikimr::NOlap {
87

98
class TGranulesNormalizer: public TNormalizationController::INormalizerComponent {
9+
private:
10+
using TBase = TNormalizationController::INormalizerComponent;
11+
1012
public:
1113
static TString GetClassNameStatic() {
1214
return ::ToString(ENormalizerSequentialId::Granules);
1315
}
16+
1417
private:
1518
class TNormalizerResult;
1619

17-
static const inline INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer> Registrator = INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer>(
18-
GetClassNameStatic());
20+
static const inline INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer> Registrator =
21+
INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer>(GetClassNameStatic());
22+
1923
public:
20-
TGranulesNormalizer(const TNormalizationController::TInitContext&) {
24+
TGranulesNormalizer(const TNormalizationController::TInitContext& context)
25+
: TBase(context) {
2126
}
2227

2328
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
@@ -28,7 +33,8 @@ class TGranulesNormalizer: public TNormalizationController::INormalizerComponent
2833
return GetClassNameStatic();
2934
}
3035

31-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
36+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
37+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
3238
};
3339

34-
}
40+
} // namespace NKikimr::NOlap
Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
#pragma once
22

3-
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
43
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5-
4+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
65

76
namespace NKikimr::NOlap::NInsertionDedup {
87

98
class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent {
9+
private:
10+
using TBase = TNormalizationController::INormalizerComponent;
11+
1012
public:
1113
static TString GetClassNameStatic() {
1214
return "CleanInsertionDedup";
1315
}
16+
1417
private:
1518
class TNormalizerResult;
1619

1720
static const inline INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer> Registrator =
1821
INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer>(GetClassNameStatic());
1922

2023
public:
21-
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) {
24+
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext& context)
25+
: TBase(context) {
2226
}
2327

2428
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
@@ -29,7 +33,8 @@ class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerCo
2933
return GetClassNameStatic();
3034
}
3135

32-
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
36+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
37+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
3338
};
3439

35-
}
40+
} // namespace NKikimr::NOlap::NInsertionDedup

0 commit comments

Comments
 (0)