Skip to content

Commit e5aa64d

Browse files
authored
Fix busywait on adding to full async input buffer (backport 24-3 #14522) (#15584)
1 parent 027ad44 commit e5aa64d

File tree

7 files changed

+80
-15
lines changed

7 files changed

+80
-15
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
605605
}
606606

607607
void DoExecuteImpl() override {
608-
PollAsyncInput();
608+
LastPollResult = PollAsyncInput();
609+
610+
if (LastPollResult && *LastPollResult != EResumeSource::CAPollAsyncNoSpace) {
611+
// When (some) source buffers was not full, and (some) was successfully polled,
612+
// initiate next DoExecute run immediately;
613+
// If only reason for continuing was lack on space on all source
614+
// buffers, only continue execution after run completed,
615+
// (some) sources was consumed and compute waits for input
616+
// (Otherwise we enter busy-poll, and there are especially bad scenario
617+
// when compute is delayed by rate-limiter, we enter busy-poll here,
618+
// this spends cpu, ratelimiter delays compute execution even more))
619+
ContinueExecute(*std::exchange(LastPollResult, {}));
620+
}
621+
609622
if (ProcessSourcesState.Inflight == 0) {
610623
auto req = GetCheckpointRequest();
611624
CA_LOG_T("DoExecuteImpl: " << (bool) req);
@@ -1087,6 +1100,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
10871100
CA_LOG_T("AsyncCheckRunStatus: TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size());
10881101
return;
10891102
}
1103+
if (ProcessOutputsState.LastRunStatus == ERunStatus::PendingInput && LastPollResult) {
1104+
ContinueExecute(*LastPollResult);
1105+
}
10901106
TBase::CheckRunStatus();
10911107
}
10921108

@@ -1134,6 +1150,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
11341150
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
11351151
NMonitoring::TDynamicCounters::TCounterPtr CpuTime;
11361152
NDqProto::TEvComputeActorState ComputeActorState;
1153+
TMaybe<EResumeSource> LastPollResult;
11371154
};
11381155

11391156

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
397397
return;
398398
}
399399

400+
if (status == ERunStatus::PendingInput) {
401+
for (auto& [id, inputTransform] : InputTransformsMap) {
402+
if (!inputTransform.Buffer->Empty()) {
403+
ContinueExecute(EResumeSource::CAPendingInput);
404+
}
405+
}
406+
}
407+
400408
if (status != ERunStatus::Finished) {
401409
// If the incoming channel's buffer was full at the moment when last ChannelDataAck event had been sent,
402410
// there will be no attempts to send a new piece of data from the other side of this channel.
@@ -1389,26 +1397,38 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13891397
}
13901398
}
13911399

1392-
void PollAsyncInput() {
1400+
[[nodiscard]]
1401+
TMaybe<EResumeSource> PollAsyncInput() {
1402+
TMaybe<EResumeSource> pollResult;
1403+
if (!Running) {
1404+
CA_LOG_T("Skip polling inputs and sources because not running");
1405+
return pollResult;
1406+
}
1407+
1408+
CA_LOG_T("Poll inputs");
1409+
for (auto& [inputIndex, transform] : InputTransformsMap) {
1410+
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1411+
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
1412+
pollResult = resume;
1413+
}
1414+
}
1415+
}
1416+
13931417
// Don't produce any input from sources if we're about to save checkpoint.
1394-
if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
1418+
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
13951419
CA_LOG_T("Skip polling sources because of pending checkpoint");
1396-
return;
1420+
return pollResult;
13971421
}
13981422

13991423
CA_LOG_T("Poll sources");
14001424
for (auto& [inputIndex, source] : SourcesMap) {
14011425
if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1402-
ContinueExecute(*resume);
1403-
}
1404-
}
1405-
1406-
CA_LOG_T("Poll inputs");
1407-
for (auto& [inputIndex, transform] : InputTransformsMap) {
1408-
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1409-
ContinueExecute(*resume);
1426+
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
1427+
pollResult = resume;
1428+
}
14101429
}
14111430
}
1431+
return pollResult;
14121432
}
14131433

14141434
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
3030
void DoExecuteImpl() override{
3131
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();
3232

33-
TBase::PollAsyncInput();
33+
auto lastPollResult = TBase::PollAsyncInput();
3434
ERunStatus status = TaskRunner->Run();
3535

3636
CA_LOG_T("Resume execution, run status: " << status);
@@ -44,6 +44,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
}
4545

4646
TBase::ProcessOutputsImpl(status);
47+
48+
if (lastPollResult && (*lastPollResult != EResumeSource::CAPollAsyncNoSpace || status == ERunStatus::PendingInput)) {
49+
// If only reason for continuing was lack on space on all sources,
50+
// only continue execution when input was consumed;
51+
// otherwise this may result in busy-poll
52+
TBase::ContinueExecute(*lastPollResult);
53+
}
4754
}
4855

4956
void DoTerminateImpl() override {

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ class TInputTransformStreamLookupBase
185185
}
186186
}
187187
finished = IsFinished();
188-
return 0;
188+
return AwaitingQueue.RowCount();
189189
}
190190

191191
TMaybe<google::protobuf::Any> ExtraData() override {

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,18 @@ class TLocalTaskRunnerActor
144144
return false;
145145
}
146146
}
147+
for (const auto transformId: InputTransforms) {
148+
const auto t = TaskRunner->GetInputTransform(transformId);
149+
if (t) {
150+
auto [_, transform] = *t;
151+
if (!transform->Empty()) {
152+
return false;
153+
}
154+
if (transform->IsPending()) {
155+
return false;
156+
}
157+
}
158+
}
147159
return true;
148160
}
149161

@@ -443,6 +455,7 @@ class TLocalTaskRunnerActor
443455
for (auto i = 0; i != inputs.size(); ++i) {
444456
if (auto t = TaskRunner->GetInputTransform(i)) {
445457
inputTransforms[i] = *t;
458+
InputTransforms.emplace(i);
446459
}
447460
}
448461

@@ -490,6 +503,7 @@ class TLocalTaskRunnerActor
490503
const TTxId TxId;
491504
const ui64 TaskId;
492505
THashSet<ui32> Inputs;
506+
THashSet<ui32> InputTransforms;
493507
THashSet<ui32> Sources;
494508
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
495509
THashSet<ui32> InputChannelsWithDisabledCheckpoints;

ydb/library/yql/dq/runtime/dq_async_input.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NYql::NDq {
66
class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> {
77
using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>;
88
friend TBaseImpl;
9+
bool Pending = false;
910
public:
1011
TDqAsyncInputBufferStats PushStats;
1112
TDqInputStats PopStats;
@@ -32,7 +33,7 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
3233
}
3334

3435
void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
35-
Y_ABORT_UNLESS(!batch.empty() || !space);
36+
Pending = space != 0;
3637
if (!batch.empty()) {
3738
AddBatch(std::move(batch), space);
3839
}
@@ -41,6 +42,10 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
4142
virtual void Push(TDqSerializedBatch&&, i64) override {
4243
YQL_ENSURE(!"Unimplemented");
4344
}
45+
46+
bool IsPending() const override {
47+
return Pending;
48+
}
4449
};
4550

4651
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(

ydb/library/yql/dq/runtime/dq_async_input.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class IDqAsyncInputBuffer : public IDqInput {
2323
virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;
2424

2525
virtual void Finish() = 0;
26+
27+
virtual bool IsPending() const { return false; };
2628
};
2729

2830
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,

0 commit comments

Comments
 (0)