-
Notifications
You must be signed in to change notification settings - Fork 694
Fix busywait on adding to full async input buffer #14522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5b30a8a
f09ceb1
c73b4ce
ba8b5fd
fd18923
c2c2949
0cf5c08
3571e9d
ba5062a
f0b65a1
6e15f6b
2934e2c
66f2e27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -696,7 +696,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC | |
} | ||
|
||
void DoExecuteImpl() override { | ||
PollAsyncInput(); | ||
LastPollResult = PollAsyncInput(); | ||
|
||
if (LastPollResult && *LastPollResult != EResumeSource::CAPollAsyncNoSpace) { | ||
// When (some) source buffers was not full, and (some) was successfully polled, | ||
// initiate next DoExecute run immediately; | ||
// If only reason for continuing was lack on space on all source | ||
// buffers, only continue execution after run completed, | ||
// (some) sources was consumed and compute waits for input | ||
// (Otherwise we enter busy-poll, and there are especially bad scenario | ||
// when compute is delayed by rate-limiter, we enter busy-poll here, | ||
// this spends cpu, ratelimiter delays compute execution even more)) | ||
ContinueExecute(*std::exchange(LastPollResult, {})); | ||
} | ||
|
||
if (ProcessSourcesState.Inflight == 0) { | ||
auto req = GetCheckpointRequest(); | ||
CA_LOG_T("DoExecuteImpl: " << (bool) req); | ||
|
@@ -1185,6 +1198,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC | |
CA_LOG_T("AsyncCheckRunStatus: TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size()); | ||
return; | ||
} | ||
if (ProcessOutputsState.LastRunStatus == ERunStatus::PendingInput && LastPollResult) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Как я понял тут именно CAPollAsyncNoSpace. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Да, тут ожидается только NoSpace; других вариантов, кмк, тут уже быть не может. |
||
ContinueExecute(*LastPollResult); | ||
} | ||
TBase::CheckRunStatus(); | ||
} | ||
|
||
|
@@ -1233,6 +1249,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC | |
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay; | ||
NMonitoring::TDynamicCounters::TCounterPtr CpuTime; | ||
NDqProto::TEvComputeActorState ComputeActorState; | ||
TMaybe<EResumeSource> LastPollResult; | ||
}; | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Зачем еще раз запрашивать новый цикл? Он же сюда же в итоге придет. Данные сорсов синхронно пушаться в TaskRunner (
TaskRunnerActor->AsyncInputPush
), поэтому можно/нужно сразу вызыватьAskContinueRun()
.Т.е. тут даже можно прокинуть EResumeSource и анализировать: если пришли по CANewAsyncInput, то делаем (
PollAsyncInput()
); если пришли по другой причине, то безусловано делаемAskContinueRun()
.Это всё неточно(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Если
то мы можем продолжать их вычитывать, не дожидаясь квоты
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Я не вполне уверен, что от всей этой асинхронности много толку, с высокой вероятностью данные всё равно будут оседать в каких-то буферах, но я предпочёл заткнуть очевидную дырку ( busywait) и как можно меньше менять работу остальной части