Skip to content

Commit 27aff17

Browse files
committed
add preemption event
1 parent 68a09a0 commit 27aff17

File tree

4 files changed

+65
-6
lines changed

4 files changed

+65
-6
lines changed

ydb/library/actors/core/events.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ namespace NActors {
106106
CoroTimeout,
107107
InvokeQuery,
108108
Wilson,
109+
Preemption,
109110
End,
110111

111112
// Compatibility section
@@ -128,6 +129,14 @@ namespace NActors {
128129
const ui64 Tag = 0;
129130
};
130131

132+
struct TEvPreemption: public TEventLocal<TEvPreemption, TSystem::Preemption> {
133+
bool ByEventCount = false;
134+
bool ByCycles = false;
135+
bool ByTailSend = false;
136+
ui32 EventCount = 0;
137+
ui64 Cycles = 0;
138+
};
139+
131140
struct TEvSubscribe: public TEventLocal<TEvSubscribe, TSystem::Subscribe> {
132141
};
133142

ydb/library/actors/core/executor_thread.cpp

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,26 @@ namespace NActors {
181181
SafeTypeName(actorType));
182182
}
183183

184+
ui32 TGenericExecutorThread::GetOverwrittenEventsPerMailbox() const {
185+
return Ctx.OverwrittenEventsPerMailbox;
186+
}
187+
188+
void TGenericExecutorThread::SetOverwrittenEventsPerMailbox(ui32 value) {
189+
Ctx.OverwrittenEventsPerMailbox = Max(value, Ctx.EventsPerMailbox);
190+
}
191+
192+
ui64 TGenericExecutorThread::GetOverwrittenTimePerMailboxTs() const {
193+
return Ctx.OverwrittenTimePerMailboxTs;
194+
}
195+
196+
void TGenericExecutorThread::SetOverwrittenTimePerMailboxTs(ui64 value) {
197+
Ctx.OverwrittenTimePerMailboxTs = Max(value, Ctx.TimePerMailboxTs);
198+
}
199+
200+
void TGenericExecutorThread::SubscribeToPreemption(TActorId actorId) {
201+
Ctx.PreemptionSubscribed.push_back(actorId);
202+
}
203+
184204
TGenericExecutorThread::TProcessingResult TGenericExecutorThread::Execute(TMailbox* mailbox, bool isTailExecution) {
185205
Y_DEBUG_ABORT_UNLESS(DyingActors.empty());
186206

@@ -194,16 +214,20 @@ namespace NActors {
194214
ui32 prevActivityType = std::numeric_limits<ui32>::max();
195215
TActorId recipient;
196216
bool firstEvent = true;
197-
bool preempted = false;
217+
bool preemptedByEventCount = false;
218+
bool preemptedByCycles = false;
219+
bool preemptedByTailSend = false;
198220
bool wasWorking = false;
199221
NHPTimer::STime hpnow = Ctx.HPStart;
200222
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
201223
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
202224
NHPTimer::STime eventStart = Ctx.HPStart;
203225
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);
204226

227+
Ctx.OverwrittenEventsPerMailbox = Ctx.EventsPerMailbox;
228+
Ctx.OverwrittenTimePerMailboxTs = Ctx.TimePerMailboxTs;
205229
bool drained = false;
206-
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
230+
for (; Ctx.ExecutedEvents < Ctx.OverwrittenEventsPerMailbox; ++Ctx.ExecutedEvents) {
207231
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
208232
recipient = evExt->GetRecipientRewrite();
209233
TActorContext ctx(*mailbox, *this, eventStart, recipient);
@@ -302,6 +326,7 @@ namespace NActors {
302326
Ctx.WorkerId,
303327
recipient.ToString(),
304328
SafeTypeName(actorType));
329+
preemptedByTailSend = true;
305330
break;
306331
}
307332

@@ -317,7 +342,7 @@ namespace NActors {
317342
Ctx.WorkerId,
318343
recipient.ToString(),
319344
SafeTypeName(actorType));
320-
preempted = true;
345+
preemptedByCycles = true;
321346
break;
322347
}
323348

@@ -333,7 +358,7 @@ namespace NActors {
333358
Ctx.WorkerId,
334359
recipient.ToString(),
335360
SafeTypeName(actorType));
336-
preempted = true;
361+
preemptedByCycles = true;
337362
break;
338363
}
339364

@@ -348,7 +373,7 @@ namespace NActors {
348373
Ctx.WorkerId,
349374
recipient.ToString(),
350375
SafeTypeName(actorType));
351-
preempted = true;
376+
preemptedByEventCount = true;
352377
break;
353378
}
354379
} else {
@@ -367,6 +392,22 @@ namespace NActors {
367392
break; // empty queue, leave
368393
}
369394
}
395+
if (Ctx.PreemptionSubscribed.size()) {
396+
std::unique_ptr<TEvents::TEvPreemption> event = std::make_unique<TEvents::TEvPreemption>();
397+
event->ByEventCount = preemptedByEventCount;
398+
event->ByCycles = preemptedByCycles;
399+
event->ByTailSend = preemptedByTailSend;
400+
event->EventCount = Ctx.ExecutedEvents;
401+
event->Cycles = hpnow - Ctx.HPStart;
402+
TAutoPtr<IEventHandle> ev = new IEventHandle(TActorId(), TActorId(), event.release());
403+
for (const auto& actorId : Ctx.PreemptionSubscribed) {
404+
IActor *actor = mailbox->FindActor(actorId.LocalId());
405+
if (actor) {
406+
actor->Receive(ev);
407+
}
408+
}
409+
Ctx.PreemptionSubscribed.clear();
410+
}
370411
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
371412
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
372413

@@ -378,7 +419,7 @@ namespace NActors {
378419
} else {
379420
mailbox->Unlock(Ctx.Executor, hpnow, RevolvingWriteCounter);
380421
}
381-
return {preempted, wasWorking};
422+
return {preemptedByEventCount || preemptedByCycles, wasWorking};
382423
}
383424

384425
TThreadId TGenericExecutorThread::GetThreadId() const {

ydb/library/actors/core/executor_thread.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ namespace NActors {
7575
TThreadId GetThreadId() const; // blocks, must be called after Start()
7676
TWorkerId GetWorkerId() const;
7777

78+
void SubscribeToPreemption(TActorId actorId);
79+
ui32 GetOverwrittenEventsPerMailbox() const;
80+
void SetOverwrittenEventsPerMailbox(ui32 value);
81+
ui64 GetOverwrittenTimePerMailboxTs() const;
82+
void SetOverwrittenTimePerMailboxTs(ui64 value);
83+
7884
protected:
7985
TProcessingResult ProcessExecutorPool(IExecutorPool *pool);
8086

ydb/library/actors/core/worker_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ namespace NActors {
4444
bool IsNeededToWaitNextActivation = true;
4545
i64 HPStart = 0;
4646
ui32 ExecutedEvents = 0;
47+
ui32 OverwrittenEventsPerMailbox = 0;
48+
ui64 OverwrittenTimePerMailboxTs = 0;
4749
TSharedExecutorThreadCtx *SharedThread = nullptr;
4850
TCpuSensor CpuSensor;
51+
TStackVec<TActorId, 1> PreemptionSubscribed;
4952

5053
TMailboxCache MailboxCache;
5154

0 commit comments

Comments
 (0)