Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ using namespace NYql::NDqProto;
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
settings, memoryLimits, std::move(traceId), std::move(arena));
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
Expand Down Expand Up @@ -48,12 +49,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
ApplyConfig(config);
}

void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) override
{
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
Expand All @@ -114,7 +114,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
MinMemFreeSize.store(config.GetMinMemFreeSize());
}

TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) override {
NYql::NDq::TComputeMemoryLimits memoryLimits;
memoryLimits.ChannelBufferSize = 0;
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
Expand Down Expand Up @@ -213,7 +213,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task,
AsyncIoFactory, runtimeSettings, memoryLimits,
std::move(args.TraceId), std::move(args.Arena));
std::move(args.TraceId), std::move(args.Arena),
std::move(args.SchedulingOptions));
TActorId result = TlsActivationContext->Register(computeActor);
info.MutableActorIds().emplace_back(result);
return result;
Expand All @@ -223,7 +224,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
}
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings,
std::move(args.SchedulingOptions));
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
TlsActivationContext->AsActorContext().Register(computeActor);
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>

#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>

#include <vector>

namespace NKikimr::NKqp {
Expand Down Expand Up @@ -124,6 +126,7 @@ struct IKqpNodeComputeActorFactory {
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
TComputeStagesWithScan* ComputesByStages = nullptr;
std::shared_ptr<IKqpNodeState> State = nullptr;
TComputeActorSchedulingOptions SchedulingOptions = {};
};

typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
Expand All @@ -137,4 +140,4 @@ std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfi
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup);

} // namespace NKikimr::NKqp::NComputeActor
} // namespace NKikimr::NKqp::NComputeActor
11 changes: 7 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions)
: TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
, ComputeCtx(settings.StatsMode)
, FederatedQuerySetup(federatedQuerySetup)
{
Expand Down Expand Up @@ -121,9 +121,12 @@ void TKqpComputeActor::DoBootstrap() {

ContinueExecute();
Become(&TKqpComputeActor::StateFunc);

TBase::DoBoostrap();
}

STFUNC(TKqpComputeActor::StateFunc) {
CA_LOG_D("CA StateFunc " << ev->GetTypeRewrite());
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
Expand Down Expand Up @@ -278,10 +281,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings)
const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions)
{
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings);
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions));
}

} // namespace NKqp
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/runtime/kqp_compute.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
#include <ydb/core/sys_view/scan.h>
#include <ydb/library/yverify_stream/yverify_stream.h>

#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>


namespace NKikimr {
namespace NKqp {

class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
using TBase = TDqSyncComputeActorBase<TKqpComputeActor>;
class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
using TBase = TSchedulableComputeActorBase<TKqpComputeActor>;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand All @@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
TComputeActorSchedulingOptions);

void DoBootstrap();

Expand Down Expand Up @@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
TComputeActorSchedulingOptions);

} // namespace NKqp
} // namespace NKikimr
6 changes: 4 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);

} // anonymous namespace

TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
: TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
, ComputeCtx(settings.StatsMode)
, LockTxId(lockTxId)
Expand Down Expand Up @@ -251,6 +251,8 @@ void TKqpScanComputeActor::DoBootstrap() {
ScanData->TaskId = GetTask().GetId();
ScanData->TableReader = CreateKqpTableReader(*ScanData);
Become(&TKqpScanComputeActor::StateFunc);

TBase::DoBoostrap();
}

}
8 changes: 4 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
#include "kqp_scan_events.h"

#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>

namespace NKikimr::NKqp::NScanPrivate {

class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor> {
class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeActor> {
private:
using TBase = NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor>;
using TBase = TSchedulableComputeActorBase<TKqpScanComputeActor>;

NMiniKQL::TKqpScanComputeContext ComputeCtx;
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
Expand Down Expand Up @@ -65,7 +65,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
}

TKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <ydb/core/sys_view/service/sysview_service.h>

#include <ydb/library/actors/core/log.h>

#include <util/generic/size_literals.h>

#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
Expand Down Expand Up @@ -829,6 +828,13 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
"PhyTx/ScanTxTotalTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1));

FullScansExecuted = KqpGroup->GetCounter("FullScans", true);

SchedulerThrottled = KqpGroup->GetCounter("NodeScheduler/ThrottledUs", true);
SchedulerCapacity = KqpGroup->GetCounter("NodeScheduler/Capacity");
ComputeActorExecutions = KqpGroup->GetHistogram("NodeScheduler/BatchUs", NMonitoring::ExponentialHistogram(20, 2, 1));
ComputeActorDelays = KqpGroup->GetHistogram("NodeScheduler/Delays", NMonitoring::ExponentialHistogram(20, 2, 1));
ThrottledActorsSpuriousActivations = KqpGroup->GetCounter("NodeScheduler/SpuriousActivations", true);
SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1));
}

::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,14 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;

// Scheduler signals
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;
NMonitoring::THistogramPtr ComputeActorExecutions;
NMonitoring::THistogramPtr ComputeActorDelays;
::NMonitoring::TDynamicCounters::TCounterPtr ThrottledActorsSpuriousActivations;
NMonitoring::THistogramPtr SchedulerDelays;

// Sequences counters
::NMonitoring::TDynamicCounters::TCounterPtr SequencerActorsCount;
::NMonitoring::TDynamicCounters::TCounterPtr SequencerErrors;
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe
ret.HeavyProgram = opts.GetHasMapJoin();
}

bool LimitCPU(TIntrusivePtr<TUserRequestContext> ctx) {
return ctx->PoolId && ctx->PoolConfig.has_value() && ctx->PoolConfig->TotalCpuLimitPercentPerNode > 0;
}

}

bool TKqpPlanner::UseMockEmptyPlanner = false;
Expand Down Expand Up @@ -101,6 +105,10 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
LOG_E("Database not set, use " << Database);
}
}

if (LimitCPU(UserRequestContext)) {
AllowSinglePartitionOpt = false;
}
}

// ResourcesSnapshot, ResourceEstimations
Expand Down Expand Up @@ -223,6 +231,13 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetSerializedGUCSettings(SerializedGUCSettings);
}

request.SetSchedulerGroup(UserRequestContext->PoolId);
request.SetDatabase(Database);
if (UserRequestContext->PoolConfig.has_value()) {
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
request.SetMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);
}

return result;
}

Expand Down
Loading