Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ydb/library/actors/core/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace NActors {
CoroTimeout,
InvokeQuery,
Wilson,
Preemption,
End,

// Compatibility section
Expand All @@ -128,6 +129,14 @@ namespace NActors {
const ui64 Tag = 0;
};

struct TEvPreemption: public TEventLocal<TEvPreemption, TSystem::Preemption> {
bool ByEventCount = false;
bool ByCycles = false;
bool ByTailSend = false;
ui32 EventCount = 0;
ui64 Cycles = 0;
};

struct TEvSubscribe: public TEventLocal<TEvSubscribe, TSystem::Subscribe> {
};

Expand Down
53 changes: 47 additions & 6 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,26 @@ namespace NActors {
SafeTypeName(actorType));
}

ui32 TGenericExecutorThread::GetOverwrittenEventsPerMailbox() const {
return Ctx.OverwrittenEventsPerMailbox;
}

void TGenericExecutorThread::SetOverwrittenEventsPerMailbox(ui32 value) {
Ctx.OverwrittenEventsPerMailbox = Max(value, Ctx.EventsPerMailbox);
}

ui64 TGenericExecutorThread::GetOverwrittenTimePerMailboxTs() const {
return Ctx.OverwrittenTimePerMailboxTs;
}

void TGenericExecutorThread::SetOverwrittenTimePerMailboxTs(ui64 value) {
Ctx.OverwrittenTimePerMailboxTs = Max(value, Ctx.TimePerMailboxTs);
}

void TGenericExecutorThread::SubscribeToPreemption(TActorId actorId) {
Ctx.PreemptionSubscribed.push_back(actorId);
}

TGenericExecutorThread::TProcessingResult TGenericExecutorThread::Execute(TMailbox* mailbox, bool isTailExecution) {
Y_DEBUG_ABORT_UNLESS(DyingActors.empty());

Expand All @@ -194,16 +214,20 @@ namespace NActors {
ui32 prevActivityType = std::numeric_limits<ui32>::max();
TActorId recipient;
bool firstEvent = true;
bool preempted = false;
bool preemptedByEventCount = false;
bool preemptedByCycles = false;
bool preemptedByTailSend = false;
bool wasWorking = false;
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
NHPTimer::STime eventStart = Ctx.HPStart;
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);

Ctx.OverwrittenEventsPerMailbox = Ctx.EventsPerMailbox;
Ctx.OverwrittenTimePerMailboxTs = Ctx.TimePerMailboxTs;
bool drained = false;
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
for (; Ctx.ExecutedEvents < Ctx.OverwrittenEventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
recipient = evExt->GetRecipientRewrite();
TActorContext ctx(*mailbox, *this, eventStart, recipient);
Expand Down Expand Up @@ -302,6 +326,7 @@ namespace NActors {
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
preemptedByTailSend = true;
break;
}

Expand All @@ -317,7 +342,7 @@ namespace NActors {
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
preempted = true;
preemptedByCycles = true;
break;
}

Expand All @@ -333,7 +358,7 @@ namespace NActors {
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
preempted = true;
preemptedByCycles = true;
break;
}

Expand All @@ -348,7 +373,7 @@ namespace NActors {
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
preempted = true;
preemptedByEventCount = true;
break;
}
} else {
Expand All @@ -367,6 +392,22 @@ namespace NActors {
break; // empty queue, leave
}
}
if (Ctx.PreemptionSubscribed.size()) {
std::unique_ptr<TEvents::TEvPreemption> event = std::make_unique<TEvents::TEvPreemption>();
event->ByEventCount = preemptedByEventCount;
event->ByCycles = preemptedByCycles;
event->ByTailSend = preemptedByTailSend;
event->EventCount = Ctx.ExecutedEvents;
event->Cycles = hpnow - Ctx.HPStart;
TAutoPtr<IEventHandle> ev = new IEventHandle(TActorId(), TActorId(), event.release());
for (const auto& actorId : Ctx.PreemptionSubscribed) {
IActor *actor = mailbox->FindActor(actorId.LocalId());
if (actor) {
actor->Receive(ev);
}
}
Ctx.PreemptionSubscribed.clear();
}
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

Expand All @@ -378,7 +419,7 @@ namespace NActors {
} else {
mailbox->Unlock(Ctx.Executor, hpnow, RevolvingWriteCounter);
}
return {preempted, wasWorking};
return {preemptedByEventCount || preemptedByCycles, wasWorking};
}

TThreadId TGenericExecutorThread::GetThreadId() const {
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ namespace NActors {
TThreadId GetThreadId() const; // blocks, must be called after Start()
TWorkerId GetWorkerId() const;

void SubscribeToPreemption(TActorId actorId);
ui32 GetOverwrittenEventsPerMailbox() const;
void SetOverwrittenEventsPerMailbox(ui32 value);
ui64 GetOverwrittenTimePerMailboxTs() const;
void SetOverwrittenTimePerMailboxTs(ui64 value);

protected:
TProcessingResult ProcessExecutorPool(IExecutorPool *pool);

Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/core/worker_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ namespace NActors {
bool IsNeededToWaitNextActivation = true;
i64 HPStart = 0;
ui32 ExecutedEvents = 0;
ui32 OverwrittenEventsPerMailbox = 0;
ui64 OverwrittenTimePerMailboxTs = 0;
TSharedExecutorThreadCtx *SharedThread = nullptr;
TCpuSensor CpuSensor;
TStackVec<TActorId, 1> PreemptionSubscribed;

TMailboxCache MailboxCache;

Expand Down
Loading