Skip to content

YQ kqprun pass actor system config #7693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include "config_helpers.h"

#include <ydb/library/actors/util/affinity.h>


namespace NKikimr {

namespace NActorSystemConfigHelpers {

namespace {

template <class TConfig>
static TCpuMask ParseAffinity(const TConfig& cfg) {
TCpuMask result;
if (cfg.GetCpuList()) {
result = TCpuMask(cfg.GetCpuList());
} else if (cfg.GetX().size() > 0) {
result = TCpuMask(cfg.GetX().data(), cfg.GetX().size());
} else { // use all processors
TAffinity available;
available.Current();
result = available;
}
if (cfg.GetExcludeCpuList()) {
result = result - TCpuMask(cfg.GetExcludeCpuList());
}
return result;
}

TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) {
return systemConfig.HasSelfPingInterval()
? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval())
: TDuration::MilliSeconds(10);
}

NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) {
switch (profile) {
case NKikimrConfig::TActorSystemConfig::DEFAULT:
return NActors::EASProfile::Default;
case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION:
return NActors::EASProfile::LowCpuConsumption;
case NKikimrConfig::TActorSystemConfig::LOW_LATENCY:
return NActors::EASProfile::LowLatency;
}
}

} // anonymous namespace

void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters) {
switch (poolConfig.GetType()) {
case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: {
NActors::TBasicExecutorPoolConfig basic;
basic.PoolId = poolId;
basic.PoolName = poolConfig.GetName();
if (poolConfig.HasMaxAvgPingDeviation() && counters) {
auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName);
auto &poolInfo = cpuManager.PingInfoByPool[poolId];
poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation());
poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds();
}
basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads());
basic.SpinThreshold = poolConfig.GetSpinThreshold();
basic.Affinity = ParseAffinity(poolConfig.GetAffinity());
basic.RealtimePriority = poolConfig.GetRealtimePriority();
basic.HasSharedThread = poolConfig.GetHasSharedThread();
if (poolConfig.HasTimePerMailboxMicroSecs()) {
basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs());
} else if (systemConfig.HasTimePerMailboxMicroSecs()) {
basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs());
}
if (poolConfig.HasEventsPerMailbox()) {
basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox();
} else if (systemConfig.HasEventsPerMailbox()) {
basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
}
basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile());
Y_ABORT_UNLESS(basic.EventsPerMailbox != 0);
basic.MinThreadCount = poolConfig.GetMinThreads();
basic.MaxThreadCount = poolConfig.GetMaxThreads();
basic.DefaultThreadCount = poolConfig.GetThreads();
basic.Priority = poolConfig.GetPriority();
cpuManager.Basic.emplace_back(std::move(basic));
break;
}

case NKikimrConfig::TActorSystemConfig::TExecutor::IO: {
NActors::TIOExecutorPoolConfig io;
io.PoolId = poolId;
io.PoolName = poolConfig.GetName();
io.Threads = poolConfig.GetThreads();
io.Affinity = ParseAffinity(poolConfig.GetAffinity());
cpuManager.IO.emplace_back(std::move(io));
break;
}

default:
Y_ABORT();
}
}

NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config) {
const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024;
Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2
const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0;
const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000;
const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false;

return NActors::TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor);
}

} // namespace NActorSystemConfigHelpers

} // namespace NKikimr
18 changes: 18 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <ydb/core/protos/config.pb.h>

#include <ydb/library/actors/core/config.h>


namespace NKikimr {

namespace NActorSystemConfigHelpers {

void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters);

NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config);

} // namespace NActorSystemConfigHelpers

} // namespace NKikimr
101 changes: 4 additions & 97 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "auto_config_initializer.h"
#include "config_helpers.h"
#include "config.h"
#include "kikimr_services_initializers.h"
#include "service_initializer.h"
Expand Down Expand Up @@ -277,42 +278,6 @@ IKikimrServicesInitializer::IKikimrServicesInitializer(const TKikimrRunConfig& r

// TBasicServicesInitializer

template <class TConfig>
static TCpuMask ParseAffinity(const TConfig& cfg) {
TCpuMask result;
if (cfg.GetCpuList()) {
result = TCpuMask(cfg.GetCpuList());
} else if (cfg.GetX().size() > 0) {
result = TCpuMask(cfg.GetX().data(), cfg.GetX().size());
} else { // use all processors
TAffinity available;
available.Current();
result = available;
}
if (cfg.GetExcludeCpuList()) {
result = result - TCpuMask(cfg.GetExcludeCpuList());
}
return result;
}

TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) {
return systemConfig.HasSelfPingInterval()
? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval())
: TDuration::MilliSeconds(10);
}


NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) {
switch (profile) {
case NKikimrConfig::TActorSystemConfig::DEFAULT:
return NActors::EASProfile::Default;
case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION:
return NActors::EASProfile::LowCpuConsumption;
case NKikimrConfig::TActorSystemConfig::LOW_LATENCY:
return NActors::EASProfile::LowLatency;
}
}

void AddExecutorPool(
TCpuManagerConfig& cpuManager,
const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig,
Expand All @@ -321,55 +286,7 @@ void AddExecutorPool(
const NKikimr::TAppData* appData)
{
const auto counters = GetServiceCounters(appData->Counters, "utils");
switch (poolConfig.GetType()) {
case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: {
TBasicExecutorPoolConfig basic;
basic.PoolId = poolId;
basic.PoolName = poolConfig.GetName();
if (poolConfig.HasMaxAvgPingDeviation()) {
auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName);
auto &poolInfo = cpuManager.PingInfoByPool[poolId];
poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation());
poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds();
}
basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads());
basic.SpinThreshold = poolConfig.GetSpinThreshold();
basic.Affinity = ParseAffinity(poolConfig.GetAffinity());
basic.RealtimePriority = poolConfig.GetRealtimePriority();
basic.HasSharedThread = poolConfig.GetHasSharedThread();
if (poolConfig.HasTimePerMailboxMicroSecs()) {
basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs());
} else if (systemConfig.HasTimePerMailboxMicroSecs()) {
basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs());
}
if (poolConfig.HasEventsPerMailbox()) {
basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox();
} else if (systemConfig.HasEventsPerMailbox()) {
basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
}
basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile());
Y_ABORT_UNLESS(basic.EventsPerMailbox != 0);
basic.MinThreadCount = poolConfig.GetMinThreads();
basic.MaxThreadCount = poolConfig.GetMaxThreads();
basic.DefaultThreadCount = poolConfig.GetThreads();
basic.Priority = poolConfig.GetPriority();
cpuManager.Basic.emplace_back(std::move(basic));
break;
}
case NKikimrConfig::TActorSystemConfig::TExecutor::IO: {
TIOExecutorPoolConfig io;
io.PoolId = poolId;
io.PoolName = poolConfig.GetName();
io.Threads = poolConfig.GetThreads();
io.Affinity = ParseAffinity(poolConfig.GetAffinity());
cpuManager.IO.emplace_back(std::move(io));
break;
}
default:
Y_ABORT();
}
NActorSystemConfigHelpers::AddExecutorPool(cpuManager, poolConfig, systemConfig, poolId, counters);
}

static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config,
Expand All @@ -383,16 +300,6 @@ static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSyste
return cpuManager;
}

static TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler &config) {
const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024;
Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2
const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0;
const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000;
const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false;

return TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor);
}

static bool IsServiceInitialized(NActors::TActorSystemSetup* setup, TActorId service)
{
for (auto &pr : setup->LocalServices)
Expand Down Expand Up @@ -601,7 +508,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
setup->CpuManager = CreateCpuManagerConfig(systemConfig, appData);
setup->MonitorStuckActors = systemConfig.GetMonitorStuckActors();

auto schedulerConfig = CreateSchedulerConfig(systemConfig.GetScheduler());
auto schedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler());
schedulerConfig.MonCounters = GetServiceCounters(counters, "utils");
setup->Scheduler.Reset(CreateSchedulerThread(schedulerConfig));
setup->LocalServices.emplace_back(MakeIoDispatcherActorId(), TActorSetupCmd(CreateIoDispatcherActor(
Expand Down Expand Up @@ -1265,7 +1172,7 @@ void TSchedulerActorInitializer::InitializeServices(
NActors::TActorSystemSetup* setup,
const NKikimr::TAppData* appData) {
auto& systemConfig = Config.GetActorSystemConfig();
NActors::IActor *schedulerActor = CreateSchedulerActor(CreateSchedulerConfig(systemConfig.GetScheduler()));
NActors::IActor *schedulerActor = CreateSchedulerActor(NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler()));
if (schedulerActor) {
NActors::TActorSetupCmd schedulerActorCmd(schedulerActor, NActors::TMailboxType::ReadAsFilled, appData->SystemPoolId);
setup->LocalServices.emplace_back(MakeSchedulerActorId(), std::move(schedulerActorCmd));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ SRCS(
auto_config_initializer.cpp
config.cpp
config.h
config_helpers.cpp
config_parser.cpp
config_parser.h
driver.h
Expand Down
20 changes: 19 additions & 1 deletion ydb/core/testlib/actors/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <ydb/library/actors/core/executor_pool_basic.h>
#include <ydb/library/actors/core/executor_pool_io.h>
#include <ydb/library/actors/core/scheduler_basic.h>
#include <ydb/library/actors/interconnect/interconnect_impl.h>

#include <ydb/core/protos/datashard_config.pb.h>
Expand Down Expand Up @@ -49,6 +50,11 @@ namespace NActors {
NeedStatsCollectors = true;
}

void TTestActorRuntime::SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools) {
ActorSystemSetupConfig = config;
ActorSystemPools = pools;
}

TTestActorRuntime::TTestActorRuntime(THeSingleSystemEnv d)
: TPortManager(false)
, TTestActorRuntimeBase{d}
Expand Down Expand Up @@ -131,7 +137,7 @@ namespace NActors {
node->ActorSystem = MakeActorSystem(nodeIndex, node);
node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
} else {
node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr));
node->AppData0.reset(new NKikimr::TAppData(ActorSystemPools.SystemPoolId, ActorSystemPools.UserPoolId, ActorSystemPools.IOPoolId, ActorSystemPools.BatchPoolId, ActorSystemPools.ServicePools, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr));
node->ActorSystem = MakeActorSystem(nodeIndex, node);
}
node->LogSettings->MessagePrefix = " node " + ToString(nodeId);
Expand Down Expand Up @@ -219,6 +225,18 @@ namespace NActors {
}

void TTestActorRuntime::InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) {
if (ActorSystemSetupConfig) {
setup.Executors.Reset();
setup.ExecutorsCount = 0;

setup.CpuManager = ActorSystemSetupConfig->CpuManagerConfig;
setup.MonitorStuckActors = ActorSystemSetupConfig->MonitorStuckActors;

auto schedulerConfig = ActorSystemSetupConfig->SchedulerConfig;
schedulerConfig.MonCounters = NKikimr::GetServiceCounters(node->DynamicCounters, "utils");
setup.Scheduler.Reset(CreateSchedulerThread(schedulerConfig));
}

if (NeedMonitoring && NeedStatsCollectors) {
NActors::IActor* statsCollector = NKikimr::CreateStatsCollector(1, setup, node->DynamicCounters);
setup.LocalServices.push_back({
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/testlib/actors/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ namespace NActors {
std::vector<TIntrusivePtr<NKikimr::TControlBoard>> Icb;
};

struct TActorSystemSetupConfig {
TCpuManagerConfig CpuManagerConfig;
TSchedulerConfig SchedulerConfig;
bool MonitorStuckActors = false;
};

struct TActorSystemPools {
ui32 SystemPoolId = 0;
ui32 UserPoolId = 1;
ui32 IOPoolId = 2;
ui32 BatchPoolId = 3;
TMap<TString, ui32> ServicePools = {};
};

TTestActorRuntime(THeSingleSystemEnv d);
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount);
Expand All @@ -63,6 +77,7 @@ namespace NActors {
void AddAppDataInit(std::function<void(ui32, NKikimr::TAppData&)> callback);
virtual void Initialize(TEgg);
void SetupStatsCollectors();
void SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools);

ui16 GetMonPort(ui32 nodeIndex = 0) const;

Expand Down Expand Up @@ -125,5 +140,7 @@ namespace NActors {
TActorId SleepEdgeActor;
TVector<std::function<void(ui32, NKikimr::TAppData&)>> AppDataInit_;
bool NeedStatsCollectors = false;
std::optional<TActorSystemSetupConfig> ActorSystemSetupConfig;
TActorSystemPools ActorSystemPools;
};
} // namespace NActors
Loading
Loading