Skip to content

Commit d8189dc

Browse files
committed
Allow to forget consumption peaks (ydb-platform#7763)
1 parent 154c365 commit d8189dc

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
8585
SetPriorities(config.GetPoolsConfiguration());
8686
}
8787
Scheduler.ReportCounters(counters);
88+
AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
89+
Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
8890
}
8991

9092
void Bootstrap() {
@@ -104,7 +106,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
104106
}
105107

106108
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag));
107-
Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
109+
Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
108110
Become(&TKqpNodeService::WorkState);
109111
}
110112

@@ -345,7 +347,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
345347
void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
346348
if (ev->Get()->Tag == WakeAdvanceTimeTag) {
347349
Scheduler.AdvanceTime(TMonotonic::Now());
348-
Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
350+
Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
349351
}
350352
if (ev->Get()->Tag == WakeCleaunupTag) {
351353
Schedule(TDuration::Seconds(1), ev->Release().Release());
@@ -400,9 +402,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
400402
SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration());
401403
}
402404

405+
AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
406+
Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
407+
403408
auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
404409
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
405-
406410
}
407411

408412
void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) {
@@ -525,6 +529,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
525529
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
526530

527531
TComputeScheduler Scheduler;
532+
TDuration AdvanceTimeInterval;
528533

529534
//state sharded by TxId
530535
std::shared_ptr<TNodeServiceState> State_;

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ class TSchedulerEntity {
134134
static constexpr double BatchCalcDecay = 0;
135135
TDuration BatchTime = AvgBatch;
136136

137+
TDuration OverflowToleranceTimeout = TDuration::Seconds(1);
138+
137139
static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10);
138140

139141
size_t Wakeups = 0;
@@ -212,6 +214,7 @@ struct TComputeScheduler::TImpl {
212214

213215
TIntrusivePtr<TKqpCounters> Counters;
214216
TDuration SmoothPeriod = TDuration::MilliSeconds(100);
217+
TDuration ForgetInteval = TDuration::Seconds(2);
215218

216219
TDuration MaxDelay = TDuration::Seconds(10);
217220

@@ -361,10 +364,13 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) {
361364
}
362365
double delta = 0;
363366

364-
v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load();
367+
auto tracked = Impl->Records[i]->TrackedMicroSeconds.load();
365368
v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight;
366369
v.Next()->LastNowRecalc = now;
367-
v.Next()->TrackedBefore = Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore);
370+
v.Next()->TrackedBefore =
371+
Max<ssize_t>(
372+
tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight,
373+
Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
368374

369375
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) {
370376
delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
@@ -425,6 +431,10 @@ void TComputeScheduler::SetMaxDeviation(TDuration period) {
425431
Impl->SmoothPeriod = period;
426432
}
427433

434+
void TComputeScheduler::SetForgetInterval(TDuration period) {
435+
Impl->ForgetInteval = period;
436+
}
437+
428438
bool TComputeScheduler::Disabled(TString group) {
429439
auto ptr = Impl->PoolId.FindPtr(group);
430440
return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled;

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TComputeScheduler {
7474

7575
void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now);
7676
void SetMaxDeviation(TDuration);
77+
void SetForgetInterval(TDuration);
7778
::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const;
7879

7980
TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now);

ydb/core/protos/table_service_config.proto

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,14 +313,23 @@ message TTableServiceConfig {
313313

314314
message TSubPoolsConfiguration {
315315
repeated TComputePoolConfiguration SubPools = 1;
316-
};
316+
}
317317

318318
message TComputePoolConfiguration {
319319
optional double MaxCpuShare = 1;
320320
oneof ResourceConfiguration {
321321
string Name = 2;
322322
TSubPoolsConfiguration SubPoolsConfiguration = 3;
323323
}
324-
};
324+
}
325+
326+
message TComputeSchedulerSettings {
327+
optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000];
328+
optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000];
329+
}
330+
325331
optional TComputePoolConfiguration PoolsConfiguration = 68;
332+
optional TComputeSchedulerSettings ComputeSchedulerSettings = 70;
333+
334+
optional bool EnableRowsDuplicationCheck = 69 [ default = false ];
326335
};

0 commit comments

Comments
 (0)