Skip to content

Commit 5ad7d5a

Browse files
authored
Merge c73b4ce into c15923e
2 parents c15923e + c73b4ce commit 5ad7d5a

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,13 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
696696
}
697697

698698
void DoExecuteImpl() override {
699-
PollAsyncInput();
699+
bool isFull = false;
700+
if (ProcessOutputsState.Inflight == 0 && ProcessOutputsState.HasDataToSend && !ProcessOutputsState.DataWasSent && ProcessOutputsState.LastRunStatus == ERunStatus::PendingOutput) {
701+
// FIXME is this condition correct?
702+
// we need to stop polling when output is full and all input buffers are full
703+
isFull = true;
704+
}
705+
PollAsyncInput(!isFull);
700706
if (ProcessSourcesState.Inflight == 0) {
701707
auto req = GetCheckpointRequest();
702708
CA_LOG_T("DoExecuteImpl: " << (bool) req);

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,7 +1465,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14651465
}
14661466
}
14671467

1468-
void PollAsyncInput() {
1468+
void PollAsyncInput(bool continueExecuteOnFull = true) {
14691469
if (!Running) {
14701470
CA_LOG_T("Skip polling inputs and sources because not running");
14711471
return;
@@ -1474,7 +1474,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14741474
CA_LOG_T("Poll inputs");
14751475
for (auto& [inputIndex, transform] : InputTransformsMap) {
14761476
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1477-
ContinueExecute(*resume);
1477+
if (*resume != EResumeSource::CAPollAsyncNoSpace || continueExecuteOnFull) {
1478+
ContinueExecute(*resume);
1479+
}
14781480
}
14791481
}
14801482

@@ -1487,7 +1489,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14871489
CA_LOG_T("Poll sources");
14881490
for (auto& [inputIndex, source] : SourcesMap) {
14891491
if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1490-
ContinueExecute(*resume);
1492+
if (*resume != EResumeSource::CAPollAsyncNoSpace || continueExecuteOnFull) {
1493+
ContinueExecute(*resume);
1494+
}
14911495
}
14921496
}
14931497
}

0 commit comments

Comments
 (0)