Skip to content

Commit b451140

Browse files
authored
Fix busywait on adding to full async input buffer (backport #14522) (#15582)
1 parent 50d9f86 commit b451140

File tree

3 files changed

+39
-8
lines changed

3 files changed

+39
-8
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
696696
}
697697

698698
void DoExecuteImpl() override {
699-
PollAsyncInput();
699+
LastPollResult = PollAsyncInput();
700+
701+
if (LastPollResult && *LastPollResult != EResumeSource::CAPollAsyncNoSpace) {
702+
// When (some) source buffers was not full, and (some) was successfully polled,
703+
// initiate next DoExecute run immediately;
704+
// If only reason for continuing was lack on space on all source
705+
// buffers, only continue execution after run completed,
706+
// (some) sources was consumed and compute waits for input
707+
// (Otherwise we enter busy-poll, and there are especially bad scenario
708+
// when compute is delayed by rate-limiter, we enter busy-poll here,
709+
// this spends cpu, ratelimiter delays compute execution even more))
710+
ContinueExecute(*std::exchange(LastPollResult, {}));
711+
}
712+
700713
if (ProcessSourcesState.Inflight == 0) {
701714
auto req = GetCheckpointRequest();
702715
CA_LOG_T("DoExecuteImpl: " << (bool) req);
@@ -1194,6 +1207,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
11941207
CA_LOG_T("AsyncCheckRunStatus: TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size());
11951208
return;
11961209
}
1210+
if (ProcessOutputsState.LastRunStatus == ERunStatus::PendingInput && LastPollResult) {
1211+
ContinueExecute(*LastPollResult);
1212+
}
11971213
TBase::CheckRunStatus();
11981214
}
11991215

@@ -1242,6 +1258,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
12421258
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
12431259
NMonitoring::TDynamicCounters::TCounterPtr CpuTime;
12441260
NDqProto::TEvComputeActorState ComputeActorState;
1261+
TMaybe<EResumeSource> LastPollResult;
12451262
};
12461263

12471264

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
459459
return;
460460
}
461461

462-
if (status != ERunStatus::Finished) {
462+
if (status == ERunStatus::PendingInput) {
463463
for (auto& [id, inputTransform] : InputTransformsMap) {
464464
if (!inputTransform.Buffer->Empty()) {
465465
ContinueExecute(EResumeSource::CAPendingInput);
@@ -1462,31 +1462,38 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14621462
}
14631463
}
14641464

1465-
void PollAsyncInput() {
1465+
[[nodiscard]]
1466+
TMaybe<EResumeSource> PollAsyncInput() {
1467+
TMaybe<EResumeSource> pollResult;
14661468
if (!Running) {
14671469
CA_LOG_T("Skip polling inputs and sources because not running");
1468-
return;
1470+
return pollResult;
14691471
}
14701472

14711473
CA_LOG_T("Poll inputs");
14721474
for (auto& [inputIndex, transform] : InputTransformsMap) {
14731475
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1474-
ContinueExecute(*resume);
1476+
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
1477+
pollResult = resume;
1478+
}
14751479
}
14761480
}
14771481

14781482
// Don't produce any input from sources if we're about to save checkpoint.
14791483
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
14801484
CA_LOG_T("Skip polling sources because of pending checkpoint");
1481-
return;
1485+
return pollResult;
14821486
}
14831487

14841488
CA_LOG_T("Poll sources");
14851489
for (auto& [inputIndex, source] : SourcesMap) {
14861490
if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1487-
ContinueExecute(*resume);
1491+
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
1492+
pollResult = resume;
1493+
}
14881494
}
14891495
}
1496+
return pollResult;
14901497
}
14911498

14921499
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
3030
void DoExecuteImpl() override{
3131
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();
3232

33-
TBase::PollAsyncInput();
33+
auto lastPollResult = TBase::PollAsyncInput();
3434
ERunStatus status = TaskRunner->Run();
3535

3636
CA_LOG_T("Resume execution, run status: " << status);
@@ -44,6 +44,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
}
4545

4646
TBase::ProcessOutputsImpl(status);
47+
48+
if (lastPollResult && (*lastPollResult != EResumeSource::CAPollAsyncNoSpace || status == ERunStatus::PendingInput)) {
49+
// If only reason for continuing was lack on space on all sources,
50+
// only continue execution when input was consumed;
51+
// otherwise this may result in busy-poll
52+
TBase::ContinueExecute(*lastPollResult);
53+
}
4754
}
4855

4956
void DoTerminateImpl() override {

0 commit comments

Comments
 (0)