Skip to content

Commit cf6914e

Browse files
authored
Merge 00bdff1 into 8bb48d2
2 parents 8bb48d2 + 00bdff1 commit cf6914e

File tree

7 files changed

+113
-12
lines changed

7 files changed

+113
-12
lines changed

ydb/library/actors/actor_type/common.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ enum class EInternalActorType {
2929
NAMESERVICE,
3030
DNS_RESOLVER,
3131
INTERCONNECT_PROXY_WRAPPER,
32-
ACTOR_COROUTINE
32+
ACTOR_COROUTINE,
3333
};
34+
35+
enum class EInternalActorSystemActivity {
36+
ACTOR_SYSTEM_SPIN,
37+
ACTOR_SYSTEM_HARMONIZER,
38+
ACTOR_SYSTEM_SEND,
39+
ACTOR_SYSTEM_REGISTER,
40+
ACTOR_SYSTEM_GET_ACTIVATION,
41+
ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE,
42+
ACTOR_SYSTEM_WAKE_UP,
43+
};
44+
3445
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#pragma once
2+
3+
#include "thread_context.h"
4+
#include "worker_context.h"
5+
6+
#include <ydb/library/actors/actor_type/common.h>
7+
#include <ydb/library/actors/actor_type/index_constructor.h>
8+
#include <ydb/library/actors/util/local_process_key.h>
9+
10+
namespace NActors {
11+
12+
13+
template <EInternalActorSystemActivity type>
14+
class TInternalActorTypeGuard {
15+
public:
16+
TInternalActorTypeGuard() {
17+
if (Enabled && TlsThreadContext) {
18+
NHPTimer::STime hpnow = GetCycleCountFast();
19+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
20+
NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
21+
TlsThreadContext->WorkerCtx->AddElapsedCycles(NextIndex, hpnow - hpprev);
22+
}
23+
}
24+
25+
TInternalActorTypeGuard(ui32 nextIndex)
26+
: NextIndex(nextIndex)
27+
{
28+
if (Enabled && TlsThreadContext) {
29+
NHPTimer::STime hpnow = GetCycleCountFast();
30+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
31+
ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
32+
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevIndex, hpnow - hpprev);
33+
}
34+
}
35+
36+
TInternalActorTypeGuard(NHPTimer::STime hpnow) {
37+
if (Enabled && TlsThreadContext) {
38+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
39+
NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
40+
TlsThreadContext->WorkerCtx->AddElapsedCycles(NextIndex, hpnow - hpprev);
41+
}
42+
}
43+
44+
TInternalActorTypeGuard(NHPTimer::STime hpnow, ui32 nextIndex)
45+
: NextIndex(nextIndex)
46+
{
47+
if (Enabled && TlsThreadContext) {
48+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
49+
ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
50+
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevIndex, hpnow - hpprev);
51+
}
52+
}
53+
54+
~TInternalActorTypeGuard() {
55+
if (Enabled && TlsThreadContext) {
56+
NHPTimer::STime hpnow = GetCycleCountFast();
57+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
58+
TlsThreadContext->ElapsingActorActivity.store(NextIndex, std::memory_order_release);
59+
TlsThreadContext->WorkerCtx->AddElapsedCycles(Index, hpnow - hpprev);
60+
}
61+
}
62+
63+
64+
private:
65+
static constexpr bool Enabled = false;
66+
static ui32 Index;
67+
ui32 NextIndex = 0;
68+
};
69+
70+
template <EInternalActorSystemActivity type>
71+
ui32 TInternalActorTypeGuard<type>::Index = TEnumProcessKey<TActorActivityTag, EInternalActorSystemActivity>::GetIndex(type);
72+
73+
}

ydb/library/actors/core/actorsystem.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "defs.h"
2+
#include "activity_guard.h"
23
#include "actorsystem.h"
34
#include "callstack.h"
45
#include "cpu_manager.h"
@@ -96,6 +97,7 @@ namespace NActors {
9697
if (Y_UNLIKELY(!ev))
9798
return false;
9899

100+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_SEND> activityGuard;
99101
#ifdef USE_ACTOR_CALLSTACK
100102
ev->Callstack.TraceIfEmpty();
101103
#endif

ydb/library/actors/core/executor_pool_base.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ namespace NActors {
149149

150150
TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) {
151151
NHPTimer::STime hpstart = GetCycleCountFast();
152+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER> activityGuard(hpstart);
152153
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
153154
ui32 at = actor->GetActivityType();
154155
Y_DEBUG_ABORT_UNLESS(at < Stats.ActorsAliveByActivity.size());
@@ -236,6 +237,7 @@ namespace NActors {
236237

237238
TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
238239
NHPTimer::STime hpstart = GetCycleCountFast();
240+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER> activityGuard(hpstart);
239241
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
240242
ui32 at = actor->GetActivityType();
241243
if (at >= Stats.MaxActivityType())

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ namespace NActors {
200200
}
201201

202202
ui32 TBasicExecutorPool::GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingCounter) {
203+
NHPTimer::STime hpnow = GetCycleCountFast();
204+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION> activityGuard(hpnow);
205+
203206
TWorkerId workerId = wctx.WorkerId;
204207
Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);
205208

@@ -209,10 +212,9 @@ namespace NActors {
209212
Y_ABORT_UNLESS(wctx.SharedThread);
210213
wctx.SharedThread->UnsetWork();
211214
}
212-
213215
if (Harmonizer) {
214216
LWPROBE(TryToHarmonize, PoolId, PoolName);
215-
Harmonizer->Harmonize(TlsThreadContext->StartOfProcessingEventTS.load(std::memory_order_relaxed));
217+
Harmonizer->Harmonize(hpnow);
216218
}
217219

218220
TAtomic x = AtomicGet(Semaphore);
@@ -232,6 +234,7 @@ namespace NActors {
232234
}
233235
}
234236
} else {
237+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE> activityGuard;
235238
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
236239
if (workerId >= 0) {
237240
Threads[workerId].SetWork();
@@ -711,6 +714,7 @@ namespace NActors {
711714
}
712715

713716
bool TExecutorThreadCtx::WakeUp() {
717+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_WAKE_UP> activityGuard;
714718
for (ui32 i = 0; i < 2; ++i) {
715719
EThreadState state = GetState<EThreadState>();
716720
switch (state) {
@@ -739,6 +743,7 @@ namespace NActors {
739743
}
740744

741745
bool TSharedExecutorThreadCtx::WakeUp() {
746+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE> activityGuard;
742747
i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel);
743748
if (requestsForWakeUp >= 0) {
744749
return false;

ydb/library/actors/core/executor_thread_ctx.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "defs.h"
4+
#include "activity_guard.h"
45
#include "executor_thread.h"
56
#include "thread_context.h"
67

@@ -60,11 +61,13 @@ namespace NActors {
6061

6162
template <typename TDerived, typename TWaitState>
6263
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
63-
ui64 start = GetCycleCountFast();
6464
bool doSpin = true;
65+
NHPTimer::STime start = GetCycleCountFast();
66+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_SPIN> activityGuard(start);
6567
while (true) {
6668
for (ui32 j = 0; doSpin && j < 12; ++j) {
67-
if (GetCycleCountFast() >= (start + spinThresholdCycles)) {
69+
NHPTimer::STime hpnow = GetCycleCountFast();
70+
if (hpnow >= i64(start + spinThresholdCycles)) {
6871
doSpin = false;
6972
break;
7073
}
@@ -100,8 +103,8 @@ namespace NActors {
100103

101104
NHPTimer::STime hpnow = GetCycleCountFast();
102105
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
103-
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
104-
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
106+
ui32 prevActivity = TlsThreadContext->ElapsingActorActivity.exchange(Max<ui64>(), std::memory_order_acq_rel);
107+
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevActivity, hpnow - hpprev);
105108
do {
106109
if (WaitingPad.Park()) // interrupted
107110
return true;

ydb/library/actors/core/harmonizer.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "executor_thread.h"
55
#include "probes.h"
66

7+
#include "activity_guard.h"
78
#include "actorsystem.h"
89
#include "executor_pool_basic.h"
910
#include "executor_pool_basic_feature_flags.h"
@@ -758,12 +759,16 @@ void THarmonizer::Harmonize(ui64 ts) {
758759
ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
759760
LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
760761

761-
if (PriorityOrder.empty()) {
762-
CalculatePriorityOrder();
763-
}
762+
{
763+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard;
764+
765+
if (PriorityOrder.empty()) {
766+
CalculatePriorityOrder();
767+
}
764768

765-
PullStats(ts);
766-
HarmonizeImpl(ts);
769+
PullStats(ts);
770+
HarmonizeImpl(ts);
771+
}
767772

768773
Lock.Release();
769774
}

0 commit comments

Comments
 (0)