Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ydb/library/actors/core/actor_benchmark_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ struct TActorBenchmark {
, InFlight(params.InFlight)
{}

~TSendReceiveActor() {
}

void StoreCounters(std::vector<NThreading::TPadded<std::atomic<ui64>>> &dest) {
for (ui32 idx = 0; idx < dest.size(); ++idx) {
dest[idx].store(SharedCounters->Counters[idx]);
Expand Down Expand Up @@ -258,14 +261,14 @@ struct TActorBenchmark {
ui32 ReceiveTurn = 0;
};

static void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, i16 sharedExecutorsCount) {
static void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, bool hasSharedThread) {
TBasicExecutorPoolConfig basic;
basic.PoolId = setup->GetExecutorsCount();
basic.PoolName = TStringBuilder() << "b" << basic.PoolId;
basic.Threads = threads;
basic.SpinThreshold = TSettings::DefaultSpinThreshold;
basic.TimePerMailbox = TDuration::Hours(1);
basic.SharedExecutorsCount = sharedExecutorsCount;
basic.HasSharedThread = hasSharedThread;
basic.SoftProcessingDurationTs = Us2Ts(100);
if (activateEveryEvent) {
basic.EventsPerMailbox = 1;
Expand Down
99 changes: 90 additions & 9 deletions ydb/library/actors/core/actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSettings = TActorBenchmark::TSettings;
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;

Y_UNIT_TEST(WithSharedExecutors) {
return;
Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
TActorBenchmark::AddBasicPool(setup, 2, 1, 1);
TActorBenchmark::AddBasicPool(setup, 1, 1, true);

TActorSystem actorSystem(setup);
actorSystem.Start();
Expand All @@ -37,12 +35,51 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TAtomic actorsAlive = 0;
THPTimer Timer;

ui64 eventsPerPair = TSettings::TotalEventsAmountPerThread * 4 / 60;
ui64 eventsPerPair = 1000;

Timer.Reset();
for (ui32 i = 0; i < 50; ++i) {
ui32 followerPoolId = 0;
ui32 leaderPoolId = 0;
ui32 followerPoolId = 0;
ui32 leaderPoolId = 0;
TActorId followerId = actorSystem.Register(
new TActorBenchmark::TSendReceiveActor(
TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true}
),
TMailboxType::HTSwap,
followerPoolId
);
THolder<IActor> leader{
new TTestEndDecorator(THolder(new TActorBenchmark::TSendReceiveActor(
TSendReceiveActorParams{.OwnEvents=eventsPerPair / 2, .Receivers={followerId}, .Allocation=true}
)),
&pad,
&actorsAlive)
};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);

pad.Park();
auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4);
actorSystem.Stop();

Cerr << "Completed " << 1e9 * elapsedTime << Endl;
}

Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) {
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, true, true);

TActorSystem actorSystem(setup);
actorSystem.Start();

TThreadParkPad pad;
TAtomic actorsAlive = 0;
THPTimer Timer;

ui64 eventsPerPair = 1000;

Timer.Reset();
ui32 followerPoolId = 0;
ui32 leaderPoolId = 0;
for (ui32 idx = 0; idx < 50; ++idx) {
TActorId followerId = actorSystem.Register(
new TActorBenchmark::TSendReceiveActor(
TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true}
Expand All @@ -59,6 +96,50 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
}

pad.Park();
auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4);
actorSystem.Stop();

Cerr << "Completed " << 1e9 * elapsedTime << Endl;
}

Y_UNIT_TEST(WithSharedExecutors) {
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, false);
TActorBenchmark::AddBasicPool(setup, 2, 1, true);

TActorSystem actorSystem(setup);
actorSystem.Start();

TThreadParkPad pad;
TAtomic actorsAlive = 0;
THPTimer Timer;

ui64 eventsPerPair = TSettings::TotalEventsAmountPerThread * 4 / 60;

Timer.Reset();
for (ui32 i = 0; i < 50; ++i) {
ui32 followerPoolId = 0;
ui32 leaderPoolId = 0;
TActorId followerId = actorSystem.Register(
new TActorBenchmark::TSendReceiveActor(
TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true}
),
TMailboxType::HTSwap,
followerPoolId
);
THolder<IActor> leader{
new TTestEndDecorator(
THolder(new TActorBenchmark::TSendReceiveActor(
TSendReceiveActorParams{.OwnEvents=eventsPerPair / 2, .Receivers={followerId}, .Allocation=true}
)),
&pad,
&actorsAlive
)
};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
}
for (ui32 i = 0; i < 10; ++i) {
ui32 followerPoolId = 1;
ui32 leaderPoolId = 1;
Expand All @@ -82,7 +163,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}

pad.Park();
auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4);
auto elapsedTime = Timer.Passed() / (4 * TSettings::TotalEventsAmountPerThread);
actorSystem.Stop();

Cerr << "Completed " << 1e9 * elapsedTime << Endl;
Expand Down
40 changes: 38 additions & 2 deletions ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ namespace NActors {
}

void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
NSchedulerQueue::TReader* readers;
ui32 readersCount = 0;
if (Shared) {
Shared->Prepare(actorSystem, &readers, &readersCount);
for (ui32 i = 0; i != readersCount; ++i, ++readers) {
scheduleReaders.push_back(readers);
}
}
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
NSchedulerQueue::TReader* readers;
ui32 readersCount = 0;
Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount);
for (ui32 i = 0; i != readersCount; ++i, ++readers) {
Expand All @@ -58,6 +65,9 @@ namespace NActors {
}

void TCpuManager::Start() {
if (Shared) {
Shared->Start();
}
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->Start();
}
Expand All @@ -67,6 +77,9 @@ namespace NActors {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->PrepareStop();
}
if (Shared) {
Shared->PrepareStop();
}
}

void TCpuManager::Shutdown() {
Expand All @@ -81,6 +94,10 @@ namespace NActors {
}
}
}
if (Shared) {
Shared->Shutdown();
Shared->Cleanup();
}
}

void TCpuManager::Cleanup() {
Expand All @@ -93,13 +110,32 @@ namespace NActors {
}
}
}
if (Shared) {
Shared->Cleanup();
}
Executors.Destroy();
if (Shared) {
Shared.reset();
}
}

IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
return new TBasicExecutorPool(cfg, Harmonizer.get());
if (cfg.HasSharedThread) {
cfg.Threads -= 1;
if (cfg.MaxThreadCount) {
cfg.MaxThreadCount -= 1;
}
auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get());
if (pool) {
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
}
return pool;
} else {
return new TBasicExecutorPool(cfg, Harmonizer.get());
}
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
Expand Down
Loading