Skip to content

Add shared executor thread, KIKIMR-18440 #903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/testlib/actor_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TActorSystemStub::TActorSystemStub() {
THolder<NActors::TActorSystemSetup> setup(new NActors::TActorSystemSetup);
System.Reset(new NActors::TActorSystem(setup));
Mailbox.Reset(new NActors::TMailboxHeader(NActors::TMailboxType::Simple));
ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, nullptr, "thread"));
ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, "thread"));
Ctx.Reset(new NActors::TActorContext(*Mailbox, *ExecutorThread, GetCycleCountFast(), SelfID));
PrevCtx = NActors::TlsActivationContext;
NActors::TlsActivationContext = Ctx.Get();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/actors/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ namespace NActors {
node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
node->MailboxTable.Reset(new TMailboxTable());
node->ActorSystem = MakeActorSystem(nodeIndex, node);
node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), nullptr, "TestExecutor"));
node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
} else {
node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr));
node->ActorSystem = MakeActorSystem(nodeIndex, node);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/util/testactorsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class TTestActorSystem {
info.ActorSystem = std::make_unique<TActorSystem>(setup, &AppData, LoggerSettings_);
info.MailboxTable = std::make_unique<TMailboxTable>();
info.ExecutorThread = std::make_unique<TExecutorThread>(0, 0, info.ActorSystem.get(), pool,
info.MailboxTable.get(), nullptr, "TestExecutor");
info.MailboxTable.get(), "TestExecutor");
}

void StartNode(ui32 nodeId) {
Expand Down
16 changes: 8 additions & 8 deletions ydb/library/actors/core/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace NActors {
class TMailboxTable;
struct TMailboxHeader;

class TExecutorThread;
class TGenericExecutorThread;
class IActor;
class ISchedulerCookie;
class IExecutorPool;
Expand Down Expand Up @@ -45,11 +45,11 @@ namespace NActors {
struct TActivationContext {
public:
TMailboxHeader& Mailbox;
TExecutorThread& ExecutorThread;
TGenericExecutorThread& ExecutorThread;
const NHPTimer::STime EventStart;

protected:
explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
explicit TActivationContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart)
: Mailbox(mailbox)
, ExecutorThread(executorThread)
, EventStart(eventStart)
Expand Down Expand Up @@ -133,7 +133,7 @@ namespace NActors {
struct TActorContext: public TActivationContext {
const TActorId SelfID;
using TEventFlags = IEventHandle::TEventFlags;
explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
explicit TActorContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
: TActivationContext(mailbox, executorThread, eventStart)
, SelfID(selfID)
{
Expand Down Expand Up @@ -350,7 +350,7 @@ namespace NActors {
TMonotonic LastReceiveTimestamp;
size_t StuckIndex = Max<size_t>();
friend class TExecutorPoolBaseMailboxed;
friend class TExecutorThread;
friend class TGenericExecutorThread;

IActor(const ui32 activityType)
: SelfActorId(TActorId())
Expand Down Expand Up @@ -836,7 +836,7 @@ namespace NActors {


template <ESendingType SendingType>
bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
bool TGenericExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
#ifdef USE_ACTOR_CALLSTACK
do {
(ev)->Callstack = TCallstack::GetTlsCallstack();
Expand All @@ -848,7 +848,7 @@ namespace NActors {
}

template <ESendingType SendingType>
TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
TActorId parentId)
{
if (!parentId) {
Expand All @@ -871,7 +871,7 @@ namespace NActors {
}

template <ESendingType SendingType>
TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
if (!parentId) {
parentId = CurrentRecipient;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;

Y_UNIT_TEST(WithSharedExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
TActorBenchmark::AddBasicPool(setup, 2, 1, 1);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/actors/core/executor_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace NActors {
TMutex StuckObserverMutex;
std::vector<IActor*> Actors;
mutable std::vector<std::tuple<ui32, double>> DeadActorsUsage;
friend class TExecutorThread;
friend class TGenericExecutorThread;
friend class TSharedExecutorThread;
void RecalculateStuckActors(TExecutorThreadStats& stats) const;
#endif
TAtomic RegisterRevolvingCounter = 0;
Expand Down
94 changes: 49 additions & 45 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ namespace NActors {
}

if (workerId >= 0) {
Threads[workerId].ExchangeState(EThreadState::None);
Threads[workerId].UnsetWork();
}

TAtomic x = AtomicGet(Semaphore);
Expand All @@ -192,7 +192,7 @@ namespace NActors {
} else {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
Threads[workerId].ExchangeState(EThreadState::Work);
Threads[workerId].SetWork();
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
Expand Down Expand Up @@ -244,36 +244,14 @@ namespace NActors {

inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
for (i16 i = 0;;) {
TExecutorThreadCtx& threadCtx = Threads[i];
EThreadState state = threadCtx.GetState<EThreadState>();
switch (state) {
case EThreadState::None:
case EThreadState::Work:
if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
break;
case EThreadState::Spin:
case EThreadState::Sleep:
if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) {
if (state == EThreadState::Sleep) {
ui64 beforeUnpark = GetCycleCountFast();
threadCtx.StartWakingTs = beforeUnpark;
if (TlsThreadContext && TlsThreadContext->WaitingStats) {
threadCtx.WaitingPad.Unpark();
TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
} else {
threadCtx.WaitingPad.Unpark();
}
}
if (i >= currentThreadCount) {
AtomicIncrement(WrongWakenedThreadCount);
}
return;
}
break;
default:
Y_ABORT();
if (Threads[i].WakeUp()) {
if (i >= currentThreadCount) {
AtomicIncrement(WrongWakenedThreadCount);
}
return;
}
if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
}
}
Expand Down Expand Up @@ -400,19 +378,7 @@ namespace NActors {
actorSystem,
this,
MailboxTable.Get(),
&Threads[i],
PoolName,
TimePerMailbox,
EventsPerMailbox));
} else {
Threads[i].Thread.Reset(
new TExecutorThread(
i,
actorSystem,
&Threads[i],
0,
PoolName,
SoftProcessingDurationTs,
TimePerMailbox,
EventsPerMailbox));
}
Expand All @@ -438,7 +404,7 @@ namespace NActors {
StopFlag.store(true, std::memory_order_release);
for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread->StopFlag.store(true, std::memory_order_release);
Threads[i].WaitingPad.Interrupt();
Threads[i].Interrupt();
}
}

Expand Down Expand Up @@ -611,4 +577,42 @@ namespace NActors {
return Sleep(stopFlag);
}

bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
if (spinThresholdCycles > 0) {
// spin configured period
Spin(spinThresholdCycles, stopFlag);
}
return Sleep(stopFlag);
}

bool TExecutorThreadCtx::WakeUp() {
for (ui32 i = 0; i < 2; ++i) {
EThreadState state = GetState<EThreadState>();
switch (state) {
case EThreadState::None:
case EThreadState::Work:
return false;
case EThreadState::Spin:
case EThreadState::Sleep:
if (ReplaceState<EThreadState>(state, EThreadState::None)) {
if (state == EThreadState::Sleep) {
ui64 beforeUnpark = GetCycleCountFast();
StartWakingTs = beforeUnpark;
WaitingPad.Unpark();
if (TlsThreadContext && TlsThreadContext->WaitingStats) {
TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
}
}
return true;
}
break;
default:
Y_ABORT();
}
}
return false;
}

}
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ namespace NActors {
ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());

for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), &Threads[i], PoolName));
Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName));
}

*scheduleReaders = &ScheduleQueue->Reader;
Expand Down
Loading