Skip to content

Commit b32acb4

Browse files
authored
Merge d118c98 into 09a6870
2 parents 09a6870 + d118c98 commit b32acb4

22 files changed

+1216
-36
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ using namespace NYql::NDqProto;
134134
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137-
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
138-
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, task, std::move(asyncIoFactory),
137+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
138+
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory),
139139
settings, memoryLimits, std::move(traceId), std::move(arena));
140140
}
141141

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
44
#include <ydb/core/kqp/counters/kqp_counters.h>
55
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
6+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
67
#include <ydb/core/scheme/scheme_tabledefs.h>
78
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
89
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
@@ -48,12 +49,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
4849
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
4950
NWilson::TTraceId traceId,
5051
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
51-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
52+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);
5253

5354
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
5455
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
5556
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
56-
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
57+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);
5758

5859
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
5960
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
9797
const NYql::NDq::TComputeRuntimeSettings& settings,
9898
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
9999
TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
100-
NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks)
100+
NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks, TComputeActorSchedulingOptions schedulingOptions)
101101
{
102102
NYql::NDq::TComputeMemoryLimits memoryLimits;
103103
memoryLimits.ChannelBufferSize = 0;
@@ -168,7 +168,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
168168
auto& info = computesByStage.UpsertTaskWithScan(*dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
169169
IActor* computeActor = CreateKqpScanComputeActor(executerId, txId, dqTask,
170170
AsyncIoFactory, runtimeSettings, memoryLimits,
171-
std::move(traceId), std::move(arena));
171+
std::move(traceId), std::move(arena), std::move(schedulingOptions));
172172
TActorId result = TlsActivationContext->Register(computeActor);
173173
info.MutableActorIds().emplace_back(result);
174174
return result;
@@ -178,7 +178,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
178178
GUCSettings = std::make_shared<TGUCSettings>(serializedGUCSettings);
179179
}
180180
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(executerId, txId, dqTask, AsyncIoFactory,
181-
runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings);
181+
runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings, std::move(schedulingOptions));
182182
return TlsActivationContext->Register(computeActor);
183183
}
184184
}
@@ -192,4 +192,4 @@ std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfi
192192
return std::make_shared<TKqpCaFactory>(config, resourceManager, asyncIoFactory, federatedQuerySetup);
193193
}
194194

195-
}
195+
}

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
77
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
88

9+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
10+
911
#include <vector>
1012

1113
namespace NKikimr::NKqp {
@@ -107,12 +109,12 @@ struct IKqpNodeComputeActorFactory {
107109
const NYql::NDq::TComputeRuntimeSettings& settings,
108110
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
109111
TComputeStagesWithScan& computeStages, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
110-
NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) = 0;
112+
NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks, TComputeActorSchedulingOptions schedulingOptions) = 0;
111113
};
112114

113115
std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
114116
std::shared_ptr<NRm::IKqpResourceManager> resourceManager,
115117
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
116118
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup);
117119

118-
} // namespace NKikimr::NKqp::NComputeActor
120+
} // namespace NKikimr::NKqp::NComputeActor

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
1414
IDqAsyncIoFactory::TPtr asyncIoFactory,
1515
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
1616
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
17-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
18-
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
17+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions)
18+
: TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
1919
, ComputeCtx(settings.StatsMode)
2020
, FederatedQuerySetup(federatedQuerySetup)
2121
{
@@ -121,9 +121,12 @@ void TKqpComputeActor::DoBootstrap() {
121121

122122
ContinueExecute();
123123
Become(&TKqpComputeActor::StateFunc);
124+
125+
TBase::DoBootstrap();
124126
}
125127

126128
STFUNC(TKqpComputeActor::StateFunc) {
129+
CA_LOG_D("CA StateFunc " << ev->GetTypeRewrite());
127130
try {
128131
switch (ev->GetTypeRewrite()) {
129132
hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
@@ -281,10 +284,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
281284
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
282285
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
283286
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
284-
const TGUCSettings::TPtr& GUCSettings)
287+
const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions)
285288
{
286289
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
287-
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings);
290+
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions));
288291
}
289292

290293
} // namespace NKqp

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@
88
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
99
#include <ydb/core/kqp/runtime/kqp_compute.h>
1010
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
11+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1112
#include <ydb/core/sys_view/scan.h>
1213
#include <ydb/library/yverify_stream/yverify_stream.h>
1314

14-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
15-
1615

1716
namespace NKikimr {
1817
namespace NKqp {
1918

20-
class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
21-
using TBase = TDqSyncComputeActorBase<TKqpComputeActor>;
19+
class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
20+
using TBase = TSchedulableComputeActorBase<TKqpComputeActor>;
2221

2322
public:
2423
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
2928
IDqAsyncIoFactory::TPtr asyncIoFactory,
3029
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
3130
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
32-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
31+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
32+
TComputeActorSchedulingOptions);
3333

3434
void DoBootstrap();
3535

@@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
6868
IDqAsyncIoFactory::TPtr asyncIoFactory,
6969
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
7070
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
71-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
71+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
72+
TComputeActorSchedulingOptions);
7273

7374
} // namespace NKqp
7475
} // namespace NKikimr

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
2323

2424
} // anonymous namespace
2525

26-
TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
26+
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
2727
IDqAsyncIoFactory::TPtr asyncIoFactory,
2828
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
2929
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
30-
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
30+
: TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
3131
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
3232
, ComputeCtx(settings.StatsMode)
3333
{
@@ -228,6 +228,8 @@ void TKqpScanComputeActor::DoBootstrap() {
228228
ScanData->TaskId = GetTask().GetId();
229229
ScanData->TableReader = CreateKqpTableReader(*ScanData);
230230
Become(&TKqpScanComputeActor::StateFunc);
231+
232+
TBase::DoBootstrap();
231233
}
232234

233235
}

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
#include "kqp_scan_events.h"
33

44
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
5-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
5+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
77
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
88

99
namespace NKikimr::NKqp::NScanPrivate {
1010

11-
class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor> {
11+
class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeActor> {
1212
private:
13-
using TBase = NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor>;
13+
using TBase = TSchedulableComputeActorBase<TKqpScanComputeActor>;
1414
NMiniKQL::TKqpScanComputeContext ComputeCtx;
1515
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
1616
using TBase::TaskRunner;
@@ -30,7 +30,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
3030
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
3131
}
3232

33-
TKqpScanComputeActor(const TActorId& executerId, ui64 txId,
33+
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId,
3434
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
3535
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
3636
TIntrusivePtr<NActors::TProtoArenaHolder> arena);

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <ydb/core/sys_view/service/sysview_service.h>
99

1010
#include <ydb/library/actors/core/log.h>
11-
1211
#include <util/generic/size_literals.h>
1312

1413
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
@@ -826,6 +825,11 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
826825
"PhyTx/ScanTxTotalTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1));
827826

828827
FullScansExecuted = KqpGroup->GetCounter("FullScans", true);
828+
829+
SchedulerThrottled = KqpGroup->GetCounter("NodeScheduler/ThrottledUs", true);
830+
SchedulerCapacity = KqpGroup->GetCounter("NodeScheduler/Capacity");
831+
SchedulerRenices = KqpGroup->GetHistogram("NodeScheduler/ReniceValue", NMonitoring::ExponentialHistogram(20, 2, 1));
832+
ScheduledActorsRuns = KqpGroup->GetHistogram("NodeScheduler/ActorRunsUs", NMonitoring::ExponentialHistogram(20, 2, 1));
829833
}
830834

831835
::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,12 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
406406
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
407407
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;
408408

409+
// Scheduler signals
410+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
411+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;
412+
NMonitoring::THistogramPtr SchedulerRenices;
413+
NMonitoring::THistogramPtr ScheduledActorsRuns;
414+
409415
// Sequences counters
410416
::NMonitoring::TDynamicCounters::TCounterPtr SequencerActorsCount;
411417
::NMonitoring::TDynamicCounters::TCounterPtr SequencerErrors;

0 commit comments

Comments
 (0)