Skip to content

Commit ee8cc58

Browse files
committed
Allow to forget consumption peaks (ydb-platform#7763)
1 parent 8ef5cfa commit ee8cc58

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
8585
SetPriorities(config.GetPoolsConfiguration());
8686
}
8787
Scheduler.ReportCounters(counters);
88+
AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
89+
Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
8890
}
8991

9092
void Bootstrap() {
@@ -104,7 +106,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
104106
}
105107

106108
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag));
107-
Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
109+
Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
108110
Become(&TKqpNodeService::WorkState);
109111
}
110112

@@ -341,7 +343,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
341343
void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
342344
if (ev->Get()->Tag == WakeAdvanceTimeTag) {
343345
Scheduler.AdvanceTime(TMonotonic::Now());
344-
Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
346+
Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
345347
}
346348
if (ev->Get()->Tag == WakeCleaunupTag) {
347349
Schedule(TDuration::Seconds(1), ev->Release().Release());
@@ -396,9 +398,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
396398
SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration());
397399
}
398400

401+
AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
402+
Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
403+
399404
auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
400405
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
401-
402406
}
403407

404408
void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) {
@@ -521,6 +525,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
521525
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
522526

523527
TComputeScheduler Scheduler;
528+
TDuration AdvanceTimeInterval;
524529

525530
//state sharded by TxId
526531
std::shared_ptr<TNodeServiceState> State_;

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ class TSchedulerEntity {
134134
static constexpr double BatchCalcDecay = 0;
135135
TDuration BatchTime = AvgBatch;
136136

137+
TDuration OverflowToleranceTimeout = TDuration::Seconds(1);
138+
137139
static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10);
138140

139141
size_t Wakeups = 0;
@@ -212,6 +214,7 @@ struct TComputeScheduler::TImpl {
212214

213215
TIntrusivePtr<TKqpCounters> Counters;
214216
TDuration SmoothPeriod = TDuration::MilliSeconds(100);
217+
TDuration ForgetInteval = TDuration::Seconds(2);
215218

216219
TDuration MaxDelay = TDuration::Seconds(10);
217220

@@ -361,10 +364,13 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) {
361364
}
362365
double delta = 0;
363366

364-
v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load();
367+
auto tracked = Impl->Records[i]->TrackedMicroSeconds.load();
365368
v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight;
366369
v.Next()->LastNowRecalc = now;
367-
v.Next()->TrackedBefore = Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore);
370+
v.Next()->TrackedBefore =
371+
Max<ssize_t>(
372+
tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight,
373+
Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
368374

369375
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) {
370376
delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
@@ -425,6 +431,10 @@ void TComputeScheduler::SetMaxDeviation(TDuration period) {
425431
Impl->SmoothPeriod = period;
426432
}
427433

434+
void TComputeScheduler::SetForgetInterval(TDuration period) {
435+
Impl->ForgetInteval = period;
436+
}
437+
428438
bool TComputeScheduler::Disabled(TString group) {
429439
auto ptr = Impl->PoolId.FindPtr(group);
430440
return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled;

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TComputeScheduler {
7474

7575
void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now);
7676
void SetMaxDeviation(TDuration);
77+
void SetForgetInterval(TDuration);
7778
::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const;
7879

7980
TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now);

ydb/core/protos/table_service_config.proto

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,14 +313,23 @@ message TTableServiceConfig {
313313

314314
message TSubPoolsConfiguration {
315315
repeated TComputePoolConfiguration SubPools = 1;
316-
};
316+
}
317317

318318
message TComputePoolConfiguration {
319319
optional double MaxCpuShare = 1;
320320
oneof ResourceConfiguration {
321321
string Name = 2;
322322
TSubPoolsConfiguration SubPoolsConfiguration = 3;
323323
}
324-
};
324+
}
325+
326+
message TComputeSchedulerSettings {
327+
optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000];
328+
optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000];
329+
}
330+
325331
optional TComputePoolConfiguration PoolsConfiguration = 68;
332+
optional TComputeSchedulerSettings ComputeSchedulerSettings = 70;
333+
334+
optional bool EnableRowsDuplicationCheck = 69 [ default = false ];
326335
};

0 commit comments

Comments
 (0)