Skip to content

Commit 8d0f151

Browse files
disk usage limit for CS compaction (in general slider limit) (#4864)
1 parent f2808e8 commit 8d0f151

File tree

24 files changed

+449
-5
lines changed

24 files changed

+449
-5
lines changed

ydb/core/base/events.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ struct TKikimrEvents : TEvents {
178178
ES_REPLICATION_SERVICE,
179179
ES_BACKUP_SERVICE,
180180
ES_TX_BACKGROUND,
181-
ES_SS_BG_TASKS
181+
ES_SS_BG_TASKS,
182+
ES_LIMITER
182183
};
183184
};
184185

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,12 @@
176176
#include <ydb/services/metadata/ds_table/service.h>
177177
#include <ydb/services/metadata/service.h>
178178

179-
#include <ydb/core/tx/conveyor/usage/config.h>
180179
#include <ydb/core/tx/conveyor/service/service.h>
180+
#include <ydb/core/tx/conveyor/usage/config.h>
181181
#include <ydb/core/tx/conveyor/usage/service.h>
182+
#include <ydb/core/tx/limiter/service/service.h>
183+
#include <ydb/core/tx/limiter/usage/config.h>
184+
#include <ydb/core/tx/limiter/usage/service.h>
182185

183186
#include <ydb/core/backup/controller/tablet.h>
184187

@@ -2169,6 +2172,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21692172
}
21702173
}
21712174

2175+
TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
2176+
: IKikimrServicesInitializer(runConfig) {
2177+
}
2178+
2179+
void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
2180+
NLimiter::TConfig serviceConfig;
2181+
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto<NLimiter::TCompDiskLimiterPolicy>(Config.GetCompDiskLimiterConfig()));
2182+
2183+
if (serviceConfig.IsEnabled()) {
2184+
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
2185+
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_COMP_DISK_LIMITER");
2186+
2187+
auto service = NLimiter::TCompDiskOperator::CreateService(serviceConfig, countersGroup);
2188+
2189+
setup->LocalServices.push_back(std::make_pair(
2190+
NLimiter::TCompDiskOperator::MakeServiceId(NodeId),
2191+
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
2192+
}
2193+
}
2194+
21722195
TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
21732196
: IKikimrServicesInitializer(runConfig) {
21742197
}

ydb/core/driver_lib/run/kikimr_services_initializers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ class TKqpServiceInitializer : public IKikimrServicesInitializer {
391391
IGlobalObjectStorage& GlobalObjects;
392392
};
393393

394+
class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
395+
public:
396+
TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig);
397+
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
398+
};
399+
394400
class TCompConveyorInitializer: public IKikimrServicesInitializer {
395401
public:
396402
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,6 +1542,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
15421542
sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
15431543
}
15441544

1545+
if (serviceMask.EnableCompDiskLimiter) {
1546+
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
1547+
}
1548+
15451549
if (serviceMask.EnableScanConveyor) {
15461550
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
15471551
}

ydb/core/driver_lib/run/service_mask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ union TBasicKikimrServicesMask {
7777

7878
bool EnableDatabaseMetadataCache:1;
7979
bool EnableGraphService:1;
80+
bool EnableCompDiskLimiter:1;
8081
};
8182

8283
struct {

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ PEERDIR(
112112
ydb/core/tx/columnshard
113113
ydb/core/tx/coordinator
114114
ydb/core/tx/conveyor/service
115+
ydb/core/tx/limiter/service
115116
ydb/core/tx/datashard
116117
ydb/core/tx/long_tx_service
117118
ydb/core/tx/long_tx_service/public

ydb/core/protos/config.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,12 @@ message TConveyorConfig {
595595
optional double WorkersCountDouble = 5;
596596
}
597597

598+
message TLimiterConfig {
599+
optional bool Enabled = 1 [default = true];
600+
optional uint64 Limit = 2;
601+
optional uint64 PeriodMilliSeconds = 3 [default = 1000];
602+
}
603+
598604
message TExternalIndexConfig {
599605
optional bool Enabled = 1 [default = true];
600606
optional TInternalRequestConfig RequestConfig = 2;
@@ -1841,6 +1847,7 @@ message TAppConfig {
18411847
optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76;
18421848
optional TBackgroundCleaningConfig BackgroundCleaningConfig = 77;
18431849
optional TBlobCacheConfig BlobCacheConfig = 78;
1850+
optional TLimiterConfig CompDiskLimiterConfig = 79;
18441851

18451852
repeated TNamedConfig NamedConfigs = 100;
18461853
optional string ClusterYamlConfig = 101;

ydb/core/tx/columnshard/columnshard__write_index.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,34 @@
66
#include "engines/changes/abstract/abstract.h"
77
#include "engines/writer/compacted_blob_constructor.h"
88

9+
#include <ydb/core/tx/limiter/usage/abstract.h>
10+
#include <ydb/core/tx/limiter/usage/service.h>
11+
912
#include <ydb/library/actors/core/log.h>
1013

1114
namespace NKikimr::NColumnShard {
1215

16+
class TDiskResourcesRequest: public NLimiter::IResourceRequest {
17+
private:
18+
using TBase = NLimiter::IResourceRequest;
19+
std::shared_ptr<NOlap::TCompactedWriteController> WriteController;
20+
const ui64 TabletId;
21+
22+
private:
23+
virtual void DoOnResourceAllocated() override {
24+
NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max()));
25+
}
26+
27+
public:
28+
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId)
29+
: TBase(writeController->GetWriteVolume())
30+
, WriteController(writeController)
31+
, TabletId(tabletId)
32+
{
33+
34+
}
35+
};
36+
1337
void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
1438
auto putStatus = ev->Get()->GetPutStatus();
1539

@@ -32,7 +56,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
3256
if (*needDraftTransaction) {
3357
Execute(new TTxWriteDraft(this, writeController));
3458
} else {
35-
ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
59+
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID()));
3660
}
3761
}
3862
} else {

ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T
2323
for (auto&& b : portionWithBlobs.GetBlobs()) {
2424
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId())));
2525
b.RegisterBlobId(portionWithBlobs, task.GetBlobId());
26+
WriteVolume += b.GetSize();
2627
}
2728
}
2829
}

ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,17 @@ class TCompactedWriteController : public NColumnShard::IWriteController {
1313
private:
1414
TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv;
1515
TActorId DstActor;
16+
ui64 WriteVolume = 0;
17+
1618
protected:
1719
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
1820
virtual void DoAbort(const TString& reason) override;
21+
1922
public:
2023
const TBlobsAction& GetBlobsAction();
24+
ui64 GetWriteVolume() const {
25+
return WriteVolume;
26+
}
2127

2228
TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv);
2329
~TCompactedWriteController();

0 commit comments

Comments
 (0)