Skip to content

Fix busywait on adding to full async input buffer #14522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}

void DoExecuteImpl() override {
PollAsyncInput();
LastPollResult = PollAsyncInput();

if (LastPollResult && *LastPollResult != EResumeSource::CAPollAsyncNoSpace) {
// When (some) source buffers was not full, and (some) was successfully polled,
// initiate next DoExecute run immediately;
// If only reason for continuing was lack on space on all source
// buffers, only continue execution after run completed,
// (some) sources was consumed and compute waits for input
// (Otherwise we enter busy-poll, and there are especially bad scenario
// when compute is delayed by rate-limiter, we enter busy-poll here,
// this spends cpu, ratelimiter delays compute execution even more))
ContinueExecute(*std::exchange(LastPollResult, {}));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем еще раз запрашивать новый цикл? Он же сюда же в итоге придет. Данные сорсов синхронно пушаться в TaskRunner (TaskRunnerActor->AsyncInputPush), поэтому можно/нужно сразу вызывать AskContinueRun().

Т.е. тут даже можно прокинуть EResumeSource и анализировать: если пришли по CANewAsyncInput, то делаем (PollAsyncInput()); если пришли по другой причине, то безусловано делаем AskContinueRun().

Это всё неточно(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если

  1. нам не выделили cpu quota;
  2. есть непереполненные входы;
    то мы можем продолжать их вычитывать, не дожидаясь квоты

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я не вполне уверен, что от всей этой асинхронности много толку, с высокой вероятностью данные всё равно будут оседать в каких-то буферах, но я предпочёл заткнуть очевидную дырку ( busywait) и как можно меньше менять работу остальной части

}

if (ProcessSourcesState.Inflight == 0) {
auto req = GetCheckpointRequest();
CA_LOG_T("DoExecuteImpl: " << (bool) req);
Expand Down Expand Up @@ -1185,6 +1198,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
CA_LOG_T("AsyncCheckRunStatus: TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size());
return;
}
if (ProcessOutputsState.LastRunStatus == ERunStatus::PendingInput && LastPollResult) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как я понял тут именно CAPollAsyncNoSpace.
Еще вариант (теоретический) не использовать вообще CAPollAsyncNoSpace, а по TEvNewAsyncInputDataArrived сохранять какой то флажок, а не опрашивать в цикле

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, тут ожидается только NoSpace; других вариантов, кмк, тут уже быть не может.

ContinueExecute(*LastPollResult);
}
TBase::CheckRunStatus();
}

Expand Down Expand Up @@ -1233,6 +1249,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
NMonitoring::TDynamicCounters::TCounterPtr CpuTime;
NDqProto::TEvComputeActorState ComputeActorState;
TMaybe<EResumeSource> LastPollResult;
};


Expand Down
19 changes: 13 additions & 6 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
return;
}

if (status != ERunStatus::Finished) {
if (status == ERunStatus::PendingInput) {
for (auto& [id, inputTransform] : InputTransformsMap) {
if (!inputTransform.Buffer->Empty()) {
ContinueExecute(EResumeSource::CAPendingInput);
Expand Down Expand Up @@ -1462,31 +1462,38 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}
}

void PollAsyncInput() {
[[nodiscard]]
TMaybe<EResumeSource> PollAsyncInput() {
TMaybe<EResumeSource> pollResult;
if (!Running) {
CA_LOG_T("Skip polling inputs and sources because not running");
return;
return pollResult;
}

CA_LOG_T("Poll inputs");
for (auto& [inputIndex, transform] : InputTransformsMap) {
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
ContinueExecute(*resume);
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
pollResult = resume;
}
}
}

// Don't produce any input from sources if we're about to save checkpoint.
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
CA_LOG_T("Skip polling sources because of pending checkpoint");
return;
return pollResult;
}

CA_LOG_T("Poll sources");
for (auto& [inputIndex, source] : SourcesMap) {
if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
ContinueExecute(*resume);
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
pollResult = resume;
}
}
}
return pollResult;
}

void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
void DoExecuteImpl() override{
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();

TBase::PollAsyncInput();
auto lastPollResult = TBase::PollAsyncInput();
ERunStatus status = TaskRunner->Run();

CA_LOG_T("Resume execution, run status: " << status);
Expand All @@ -44,6 +44,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
}

TBase::ProcessOutputsImpl(status);

if (lastPollResult && (*lastPollResult != EResumeSource::CAPollAsyncNoSpace || status == ERunStatus::PendingInput)) {
// If only reason for continuing was lack on space on all sources,
// only continue execution when input was consumed;
// otherwise this may result in busy-poll
TBase::ContinueExecute(*lastPollResult);
}
}

void DoTerminateImpl() override {
Expand Down
Loading