Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion ydb/library/actors/actor_type/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ enum class EInternalActorType {
NAMESERVICE,
DNS_RESOLVER,
INTERCONNECT_PROXY_WRAPPER,
ACTOR_COROUTINE
ACTOR_COROUTINE,
};

enum class EInternalActorSystemActivity {
ACTOR_SYSTEM_SPIN,
ACTOR_SYSTEM_HARMONIZER,
ACTOR_SYSTEM_SEND,
ACTOR_SYSTEM_REGISTER,
ACTOR_SYSTEM_GET_ACTIVATION,
ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE,
ACTOR_SYSTEM_WAKE_UP,
};

}
73 changes: 73 additions & 0 deletions ydb/library/actors/core/activity_guard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include "thread_context.h"
#include "worker_context.h"

#include <ydb/library/actors/actor_type/common.h>
#include <ydb/library/actors/actor_type/index_constructor.h>
#include <ydb/library/actors/util/local_process_key.h>

namespace NActors {


template <EInternalActorSystemActivity type>
class TInternalActorTypeGuard {
public:
TInternalActorTypeGuard() {
if (Enabled && TlsThreadContext) {
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(NextIndex, hpnow - hpprev);
}
}

TInternalActorTypeGuard(ui32 nextIndex)
: NextIndex(nextIndex)
{
if (Enabled && TlsThreadContext) {
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevIndex, hpnow - hpprev);
}
}

TInternalActorTypeGuard(NHPTimer::STime hpnow) {
if (Enabled && TlsThreadContext) {
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(NextIndex, hpnow - hpprev);
}
}

TInternalActorTypeGuard(NHPTimer::STime hpnow, ui32 nextIndex)
: NextIndex(nextIndex)
{
if (Enabled && TlsThreadContext) {
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevIndex, hpnow - hpprev);
}
}

~TInternalActorTypeGuard() {
if (Enabled && TlsThreadContext) {
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(NextIndex, std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(Index, hpnow - hpprev);
}
}


private:
static constexpr bool Enabled = false;
static ui32 Index;
ui32 NextIndex = 0;
};

template <EInternalActorSystemActivity type>
ui32 TInternalActorTypeGuard<type>::Index = TEnumProcessKey<TActorActivityTag, EInternalActorSystemActivity>::GetIndex(type);

}
2 changes: 2 additions & 0 deletions ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "defs.h"
#include "activity_guard.h"
#include "actorsystem.h"
#include "callstack.h"
#include "cpu_manager.h"
Expand Down Expand Up @@ -96,6 +97,7 @@ namespace NActors {
if (Y_UNLIKELY(!ev))
return false;

TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_SEND> activityGuard;
#ifdef USE_ACTOR_CALLSTACK
ev->Callstack.TraceIfEmpty();
#endif
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ namespace NActors {

TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) {
NHPTimer::STime hpstart = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER> activityGuard(hpstart);
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
ui32 at = actor->GetActivityType();
Y_DEBUG_ABORT_UNLESS(at < Stats.ActorsAliveByActivity.size());
Expand Down Expand Up @@ -236,6 +237,7 @@ namespace NActors {

TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
NHPTimer::STime hpstart = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER> activityGuard(hpstart);
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
ui32 at = actor->GetActivityType();
if (at >= Stats.MaxActivityType())
Expand Down
9 changes: 7 additions & 2 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ namespace NActors {
}

ui32 TBasicExecutorPool::GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingCounter) {
NHPTimer::STime hpnow = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION> activityGuard(hpnow);

TWorkerId workerId = wctx.WorkerId;
Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);

Expand All @@ -209,10 +212,9 @@ namespace NActors {
Y_ABORT_UNLESS(wctx.SharedThread);
wctx.SharedThread->UnsetWork();
}

if (Harmonizer) {
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(TlsThreadContext->StartOfProcessingEventTS.load(std::memory_order_relaxed));
Harmonizer->Harmonize(hpnow);
}

TAtomic x = AtomicGet(Semaphore);
Expand All @@ -232,6 +234,7 @@ namespace NActors {
}
}
} else {
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE> activityGuard;
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
Threads[workerId].SetWork();
Expand Down Expand Up @@ -711,6 +714,7 @@ namespace NActors {
}

bool TExecutorThreadCtx::WakeUp() {
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_WAKE_UP> activityGuard;
for (ui32 i = 0; i < 2; ++i) {
EThreadState state = GetState<EThreadState>();
switch (state) {
Expand Down Expand Up @@ -739,6 +743,7 @@ namespace NActors {
}

bool TSharedExecutorThreadCtx::WakeUp() {
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE> activityGuard;
i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel);
if (requestsForWakeUp >= 0) {
return false;
Expand Down
11 changes: 7 additions & 4 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "defs.h"
#include "activity_guard.h"
#include "executor_thread.h"
#include "thread_context.h"

Expand Down Expand Up @@ -60,11 +61,13 @@ namespace NActors {

template <typename TDerived, typename TWaitState>
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
ui64 start = GetCycleCountFast();
bool doSpin = true;
NHPTimer::STime start = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_SPIN> activityGuard(start);
while (true) {
for (ui32 j = 0; doSpin && j < 12; ++j) {
if (GetCycleCountFast() >= (start + spinThresholdCycles)) {
NHPTimer::STime hpnow = GetCycleCountFast();
if (hpnow >= i64(start + spinThresholdCycles)) {
doSpin = false;
break;
}
Expand Down Expand Up @@ -100,8 +103,8 @@ namespace NActors {

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
ui32 prevActivity = TlsThreadContext->ElapsingActorActivity.exchange(Max<ui64>(), std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevActivity, hpnow - hpprev);
do {
if (WaitingPad.Park()) // interrupted
return true;
Expand Down
15 changes: 10 additions & 5 deletions ydb/library/actors/core/harmonizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "executor_thread.h"
#include "probes.h"

#include "activity_guard.h"
#include "actorsystem.h"
#include "executor_pool_basic.h"
#include "executor_pool_basic_feature_flags.h"
Expand Down Expand Up @@ -758,12 +759,16 @@ void THarmonizer::Harmonize(ui64 ts) {
ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);

if (PriorityOrder.empty()) {
CalculatePriorityOrder();
}
{
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard;

if (PriorityOrder.empty()) {
CalculatePriorityOrder();
}

PullStats(ts);
HarmonizeImpl(ts);
PullStats(ts);
HarmonizeImpl(ts);
}

Lock.Release();
}
Expand Down