Skip to content

Commit f77fad4

Browse files
authored
Merge 00d5b8b into 8d768df
2 parents 8d768df + 00d5b8b commit f77fad4

13 files changed

+410
-39
lines changed

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ struct TKqpEvents {
4444
EvListSessionsRequest,
4545
EvListSessionsResponse,
4646
EvListProxyNodesRequest,
47-
EvListProxyNodesResponse
47+
EvListProxyNodesResponse,
48+
EvFinishKqpTasks
4849
};
4950

5051
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ using namespace NYql::NDqProto;
134134
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137-
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
138-
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, task, std::move(asyncIoFactory),
137+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
138+
return new NScanPrivate::TKqpScanComputeActor(schedulingOptions, executerId, txId, task, std::move(asyncIoFactory),
139139
settings, memoryLimits, std::move(traceId), std::move(arena));
140140
}
141141

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
44
#include <ydb/core/kqp/counters/kqp_counters.h>
55
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
6+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
67
#include <ydb/core/scheme/scheme_tabledefs.h>
78
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
89
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
@@ -47,12 +48,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
4748
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
4849
NWilson::TTraceId traceId,
4950
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
50-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
51+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);
5152

5253
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
5354
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
5455
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
55-
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
56+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);
5657

5758
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
5859
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
1414
IDqAsyncIoFactory::TPtr asyncIoFactory,
1515
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
1616
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
17-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
18-
: TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
17+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions)
18+
: TBase(schedulingOptions, executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
1919
, ComputeCtx(settings.StatsMode)
2020
, FederatedQuerySetup(federatedQuerySetup)
2121
{
@@ -281,10 +281,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
281281
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
282282
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
283283
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
284-
const TGUCSettings::TPtr& GUCSettings)
284+
const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions)
285285
{
286286
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
287-
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings);
287+
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, cpuOptions);
288288
}
289289

290290
} // namespace NKqp

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@
88
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
99
#include <ydb/core/kqp/runtime/kqp_compute.h>
1010
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
11+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1112
#include <ydb/core/sys_view/scan.h>
1213
#include <ydb/library/yverify_stream/yverify_stream.h>
1314

14-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
15-
1615

1716
namespace NKikimr {
1817
namespace NKqp {
1918

20-
class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
21-
using TBase = TDqSyncComputeActorBase<TKqpComputeActor>;
19+
class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
20+
using TBase = TSchedulableComputeActorBase<TKqpComputeActor>;
2221

2322
public:
2423
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
2928
IDqAsyncIoFactory::TPtr asyncIoFactory,
3029
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
3130
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
32-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
31+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
32+
TComputeActorSchedulingOptions);
3333

3434
void DoBootstrap();
3535

@@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
6868
IDqAsyncIoFactory::TPtr asyncIoFactory,
6969
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
7070
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
71-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
71+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
72+
TComputeActorSchedulingOptions);
7273

7374
} // namespace NKqp
7475
} // namespace NKikimr

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
2323

2424
} // anonymous namespace
2525

26-
TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
26+
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
2727
IDqAsyncIoFactory::TPtr asyncIoFactory,
2828
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
2929
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
30-
: TBase(executerId, txId, task, std::move(asyncIoFactory), settings,
30+
: TBase(cpuOptions, executerId, txId, task, std::move(asyncIoFactory), settings,
3131
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
3232
, ComputeCtx(settings.StatsMode)
3333
{

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
#include "kqp_scan_events.h"
33

44
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
5-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
5+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
77
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
88

99
namespace NKikimr::NKqp::NScanPrivate {
1010

11-
class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor> {
11+
class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeActor> {
1212
private:
13-
using TBase = NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor>;
13+
using TBase = TSchedulableComputeActorBase<TKqpScanComputeActor>;
1414
NMiniKQL::TKqpScanComputeContext ComputeCtx;
1515
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
1616
using TBase::TaskRunner;
@@ -30,7 +30,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
3030
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
3131
}
3232

33-
TKqpScanComputeActor(const TActorId& executerId, ui64 txId,
33+
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId,
3434
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
3535
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
3636
TIntrusivePtr<NActors::TProtoArenaHolder> arena);

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
1515
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
1616
#include <ydb/core/kqp/runtime/kqp_read_iterator_common.h>
17+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1718
#include <ydb/core/kqp/common/kqp_resolve.h>
1819

1920
#include <ydb/library/wilson_ids/wilson.h>
@@ -174,9 +175,10 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
174175

175176
private:
176177
STATEFN(WorkState) {
178+
Scheduler.AdvanceTime(TlsActivationContext->Monotonic());
177179
switch (ev->GetTypeRewrite()) {
178180
hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork);
179-
hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork); // used only for unit tests
181+
hFunc(TEvFinishKqpTask, HandleWork); // used only for unit tests
180182
hFunc(TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork);
181183
hFunc(TEvents::TEvWakeup, HandleWork);
182184
// misc
@@ -444,12 +446,21 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
444446
tableKind = tableKindExtract(meta);
445447
}
446448

449+
TComputeActorSchedulingOptions schedulingOptions {
450+
.NodeService = SelfId(),
451+
.Scheduler = &Scheduler,
452+
.Group = msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN ? "olap" : "",
453+
.Weight = 1,
454+
.NoThrottle = msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA,
455+
};
456+
447457
IActor* computeActor;
448458
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
449459
auto& info = computesByStage.UpsertTaskWithScan(dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
450460
computeActor = CreateKqpScanComputeActor(request.Executer, txId, &dqTask,
451461
AsyncIoFactory, runtimeSettings, memoryLimits,
452-
NWilson::TTraceId(ev->TraceId), ev->Get()->Arena);
462+
NWilson::TTraceId(ev->TraceId), ev->Get()->Arena,
463+
schedulingOptions);
453464
taskCtx.ComputeActorId = Register(computeActor);
454465
info.MutableActorIds().emplace_back(taskCtx.ComputeActorId);
455466
} else {
@@ -459,7 +470,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
459470
}
460471
if (Y_LIKELY(!CaFactory)) {
461472
computeActor = CreateKqpComputeActor(request.Executer, txId, &dqTask, AsyncIoFactory,
462-
runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, FederatedQuerySetup, GUCSettings);
473+
runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, FederatedQuerySetup, GUCSettings,
474+
schedulingOptions);
463475
taskCtx.ComputeActorId = Register(computeActor);
464476
} else {
465477
computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, &dqTask,
@@ -490,8 +502,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
490502
}
491503

492504
// used only for unit tests
493-
void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) {
505+
void HandleWork(TEvFinishKqpTask::TPtr& ev) {
494506
auto& msg = *ev->Get();
507+
if (msg.SchedulerEntity) {
508+
Scheduler.Deregister(*msg.SchedulerEntity);
509+
}
495510
FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, GetStateBucketByTx(Buckets, msg.TxId), GetKqpResourceManager());
496511
}
497512

@@ -670,6 +685,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
670685
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
671686
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
672687

688+
TComputeScheduler Scheduler;
689+
673690
//state sharded by TxId
674691
std::shared_ptr<TBucketArray> Buckets;
675692
};

ydb/core/kqp/node_service/kqp_node_service.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ struct TKqpNodeEvents {
2626
enum EKqpNodeEvents {
2727
EvStartKqpTasksRequest = EventSpaceBegin(TKikimrEvents::ES_KQP) + 320,
2828
EvStartKqpTasksResponse,
29-
EvFinishKqpTasks,
3029
EvCancelKqpTasksRequest,
3130
EvCancelKqpTasksResponse,
3231
};
@@ -46,19 +45,6 @@ struct TEvKqpNode {
4645
struct TEvStartKqpTasksResponse : public TEventPB<TEvStartKqpTasksResponse,
4746
NKikimrKqp::TEvStartKqpTasksResponse, TKqpNodeEvents::EvStartKqpTasksResponse> {};
4847

49-
struct TEvFinishKqpTask : public TEventLocal<TEvFinishKqpTask, TKqpNodeEvents::EvFinishKqpTasks> {
50-
const ui64 TxId;
51-
const ui64 TaskId;
52-
const bool Success;
53-
const NYql::TIssues Issues;
54-
55-
TEvFinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues = {})
56-
: TxId(txId)
57-
, TaskId(taskId)
58-
, Success(success)
59-
, Issues(issues) {}
60-
};
61-
6248
struct TEvCancelKqpTasksRequest : public TEventPB<TEvCancelKqpTasksRequest,
6349
NKikimrKqp::TEvCancelKqpTasksRequest, TKqpNodeEvents::EvCancelKqpTasksRequest> {};
6450

0 commit comments

Comments
 (0)