Skip to content

Commit cc9c01c

Browse files
authored
Merge 6768433 into 8fefc80
2 parents 8fefc80 + 6768433 commit cc9c01c

18 files changed

+910
-34
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ using namespace NYql::NDqProto;
134134
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137-
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
138-
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, task, std::move(asyncIoFactory),
137+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
138+
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory),
139139
settings, memoryLimits, std::move(traceId), std::move(arena));
140140
}
141141

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
44
#include <ydb/core/kqp/counters/kqp_counters.h>
55
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
6+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
67
#include <ydb/core/scheme/scheme_tabledefs.h>
78
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
89
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
@@ -48,12 +49,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
4849
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
4950
NWilson::TTraceId traceId,
5051
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
51-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
52+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);
5253

5354
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
5455
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
5556
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
56-
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
57+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);
5758

5859
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
5960
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,15 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
102102
ApplyConfig(config);
103103
}
104104

105-
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
105+
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) override
106106
{
107107
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
108108
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
109109
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110110
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
111111
}
112112

113-
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
113+
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) override {
114114
NYql::NDq::TComputeMemoryLimits memoryLimits;
115115
memoryLimits.ChannelBufferSize = 0;
116116
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
@@ -211,7 +211,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
211211
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
212212
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
213213
AsyncIoFactory, runtimeSettings, memoryLimits,
214-
std::move(args.TraceId), std::move(args.Arena));
214+
std::move(args.TraceId), std::move(args.Arena),
215+
std::move(args.SchedulingOptions));
215216
TActorId result = TlsActivationContext->Register(computeActor);
216217
info.MutableActorIds().emplace_back(result);
217218
return result;
@@ -221,7 +222,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
221222
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
222223
}
223224
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
224-
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
225+
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings,
226+
std::move(args.SchedulingOptions));
225227
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
226228
TlsActivationContext->AsActorContext().Register(computeActor);
227229
}
@@ -236,4 +238,4 @@ std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfi
236238
return std::make_shared<TKqpCaFactory>(config, resourceManager, asyncIoFactory, federatedQuerySetup);
237239
}
238240

239-
}
241+
}

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
77
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
88

9+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
10+
911
#include <vector>
1012

1113
namespace NKikimr::NKqp {
@@ -122,6 +124,7 @@ struct IKqpNodeComputeActorFactory {
122124
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
123125
TComputeStagesWithScan* ComputesByStages = nullptr;
124126
std::shared_ptr<IKqpNodeState> State = nullptr;
127+
TComputeActorSchedulingOptions SchedulingOptions = {};
125128
};
126129

127130
typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
@@ -135,4 +138,4 @@ std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfi
135138
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
136139
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup);
137140

138-
} // namespace NKikimr::NKqp::NComputeActor
141+
} // namespace NKikimr::NKqp::NComputeActor

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
1414
IDqAsyncIoFactory::TPtr asyncIoFactory,
1515
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
1616
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
17-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
18-
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
17+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions)
18+
: TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
1919
, ComputeCtx(settings.StatsMode)
2020
, FederatedQuerySetup(federatedQuerySetup)
2121
{
@@ -121,9 +121,12 @@ void TKqpComputeActor::DoBootstrap() {
121121

122122
ContinueExecute();
123123
Become(&TKqpComputeActor::StateFunc);
124+
125+
TBase::DoBoostrap();
124126
}
125127

126128
STFUNC(TKqpComputeActor::StateFunc) {
129+
CA_LOG_D("CA StateFunc " << ev->GetTypeRewrite());
127130
try {
128131
switch (ev->GetTypeRewrite()) {
129132
hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
@@ -278,10 +281,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
278281
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
279282
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
280283
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
281-
const TGUCSettings::TPtr& GUCSettings)
284+
const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions)
282285
{
283286
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
284-
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings);
287+
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions));
285288
}
286289

287290
} // namespace NKqp

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@
88
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
99
#include <ydb/core/kqp/runtime/kqp_compute.h>
1010
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
11+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1112
#include <ydb/core/sys_view/scan.h>
1213
#include <ydb/library/yverify_stream/yverify_stream.h>
1314

14-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
15-
1615

1716
namespace NKikimr {
1817
namespace NKqp {
1918

20-
class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
21-
using TBase = TDqSyncComputeActorBase<TKqpComputeActor>;
19+
class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
20+
using TBase = TSchedulableComputeActorBase<TKqpComputeActor>;
2221

2322
public:
2423
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
2928
IDqAsyncIoFactory::TPtr asyncIoFactory,
3029
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
3130
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
32-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
31+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
32+
TComputeActorSchedulingOptions);
3333

3434
void DoBootstrap();
3535

@@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
6868
IDqAsyncIoFactory::TPtr asyncIoFactory,
6969
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
7070
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
71-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
71+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
72+
TComputeActorSchedulingOptions);
7273

7374
} // namespace NKqp
7475
} // namespace NKikimr

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
2323

2424
} // anonymous namespace
2525

26-
TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
26+
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
2727
IDqAsyncIoFactory::TPtr asyncIoFactory,
2828
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
2929
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
30-
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
30+
: TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
3131
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
3232
, ComputeCtx(settings.StatsMode)
3333
{
@@ -228,6 +228,8 @@ void TKqpScanComputeActor::DoBootstrap() {
228228
ScanData->TaskId = GetTask().GetId();
229229
ScanData->TableReader = CreateKqpTableReader(*ScanData);
230230
Become(&TKqpScanComputeActor::StateFunc);
231+
232+
TBase::DoBoostrap();
231233
}
232234

233235
}

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
#include "kqp_scan_events.h"
33

44
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
5-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
5+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
77
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
88

99
namespace NKikimr::NKqp::NScanPrivate {
1010

11-
class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor> {
11+
class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeActor> {
1212
private:
13-
using TBase = NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor>;
13+
using TBase = TSchedulableComputeActorBase<TKqpScanComputeActor>;
1414
NMiniKQL::TKqpScanComputeContext ComputeCtx;
1515
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
1616
using TBase::TaskRunner;
@@ -30,7 +30,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
3030
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
3131
}
3232

33-
TKqpScanComputeActor(const TActorId& executerId, ui64 txId,
33+
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId,
3434
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
3535
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
3636
TIntrusivePtr<NActors::TProtoArenaHolder> arena);

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <ydb/core/sys_view/service/sysview_service.h>
99

1010
#include <ydb/library/actors/core/log.h>
11-
1211
#include <util/generic/size_literals.h>
1312

1413
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
@@ -829,6 +828,12 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
829828
"PhyTx/ScanTxTotalTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1));
830829

831830
FullScansExecuted = KqpGroup->GetCounter("FullScans", true);
831+
832+
SchedulerThrottled = KqpGroup->GetCounter("NodeScheduler/ThrottledUs", true);
833+
SchedulerCapacity = KqpGroup->GetCounter("NodeScheduler/Capacity");
834+
ComputeActorExecutions = KqpGroup->GetHistogram("NodeScheduler/BatchUs", NMonitoring::ExponentialHistogram(20, 2, 1));
835+
ThrottledActorsSpuriousActivations = KqpGroup->GetCounter("NodeScheduler/SpuriousActivations", true);
836+
SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1));
832837
}
833838

834839
::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,13 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
409409
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
410410
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;
411411

412+
// Scheduler signals
413+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
414+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;
415+
NMonitoring::THistogramPtr ComputeActorExecutions;
416+
::NMonitoring::TDynamicCounters::TCounterPtr ThrottledActorsSpuriousActivations;
417+
NMonitoring::THistogramPtr SchedulerDelays;
418+
412419
// Sequences counters
413420
::NMonitoring::TDynamicCounters::TCounterPtr SequencerActorsCount;
414421
::NMonitoring::TDynamicCounters::TCounterPtr SequencerErrors;

0 commit comments

Comments
 (0)