Skip to content

Commit ffdea0a

Browse files
authored
Merge fd18923 into d1f5b91
2 parents d1f5b91 + fd18923 commit ffdea0a

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
528528
const bool shouldSkipData = Channels->ShouldSkipData(outputChannel.ChannelId);
529529
const bool hasFreeMemory = Channels->HasFreeMemoryInChannel(outputChannel.ChannelId);
530530
UpdateBlocked(outputChannel, !hasFreeMemory);
531+
if (!hasFreeMemory) {
532+
ProcessOutputsState.IsFull = true;
533+
}
531534

532535
if (!shouldSkipData && !outputChannel.EarlyFinish && !hasFreeMemory) {
533536
CA_LOG_T("DrainOutputChannel return because No free memory in channel, channel: " << outputChannel.ChannelId);
@@ -555,6 +558,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
555558

556559
const ui32 allowedOvercommit = AllowedChannelsOvercommit();
557560
const i64 sinkFreeSpaceBeforeSend = sinkInfo.AsyncOutput->GetFreeSpace();
561+
if (sinkFreeSpaceBeforeSend <= 0) {
562+
ProcessOutputsState.IsFull = true;
563+
}
558564

559565
i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit;
560566
CA_LOG_T("About to drain sink " << outputIndex
@@ -696,7 +702,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
696702
}
697703

698704
void DoExecuteImpl() override {
699-
PollAsyncInput();
705+
PollAsyncInput(!ProcessOutputsState.IsFull);
700706
if (ProcessSourcesState.Inflight == 0) {
701707
auto req = GetCheckpointRequest();
702708
CA_LOG_T("DoExecuteImpl: " << (bool) req);
@@ -939,6 +945,10 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
939945
}
940946
}
941947

948+
if (!Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) {
949+
ProcessOutputsState.IsFull = true;
950+
}
951+
942952
ProcessOutputsState.DataWasSent |= asyncData.Changed;
943953

944954
ProcessOutputsState.AllOutputsFinished =
@@ -1018,6 +1028,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
10181028
CA_LOG_T("Drain sink " << outputIndex
10191029
<< ". Free space decreased: " << (sinkInfo.FreeSpaceBeforeSend - sinkInfo.AsyncOutput->GetFreeSpace())
10201030
<< ", sent data from buffer: " << dataSize);
1031+
if (sinkInfo.AsyncOutput->GetFreeSpace() <= 0) {
1032+
ProcessOutputsState.IsFull = true;
1033+
}
10211034

10221035
ProcessOutputsState.DataWasSent |= dataWasSent;
10231036
ProcessOutputsState.AllOutputsFinished =

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
388388
}
389389

390390
void ProcessOutputsImpl(ERunStatus status) {
391-
ProcessOutputsState.LastRunStatus = status;
392-
393391
CA_LOG_T("ProcessOutputsState.Inflight: " << ProcessOutputsState.Inflight);
394392
if (ProcessOutputsState.Inflight == 0) {
395393
ProcessOutputsState = TProcessOutputsState();
396394
}
397395

396+
ProcessOutputsState.LastRunStatus = status;
397+
398398
for (auto& entry : OutputChannelsMap) {
399399
const ui64 channelId = entry.first;
400400
TOutputChannelInfo& outputChannel = entry.second;
@@ -459,7 +459,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
459459
return;
460460
}
461461

462-
if (status != ERunStatus::Finished) {
462+
if (status == ERunStatus::PendingInput) {
463463
for (auto& [id, inputTransform] : InputTransformsMap) {
464464
if (!inputTransform.Buffer->Empty()) {
465465
ContinueExecute(EResumeSource::CAPendingInput);
@@ -1462,7 +1462,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14621462
}
14631463
}
14641464

1465-
void PollAsyncInput() {
1465+
void PollAsyncInput(bool continueExecuteOnFull = true) {
14661466
if (!Running) {
14671467
CA_LOG_T("Skip polling inputs and sources because not running");
14681468
return;
@@ -1471,7 +1471,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14711471
CA_LOG_T("Poll inputs");
14721472
for (auto& [inputIndex, transform] : InputTransformsMap) {
14731473
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1474-
ContinueExecute(*resume);
1474+
if (*resume != EResumeSource::CAPollAsyncNoSpace || continueExecuteOnFull) {
1475+
ContinueExecute(*resume);
1476+
}
14751477
}
14761478
}
14771479

@@ -1484,7 +1486,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14841486
CA_LOG_T("Poll sources");
14851487
for (auto& [inputIndex, source] : SourcesMap) {
14861488
if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1487-
ContinueExecute(*resume);
1489+
if (*resume != EResumeSource::CAPollAsyncNoSpace || continueExecuteOnFull) {
1490+
ContinueExecute(*resume);
1491+
}
14881492
}
14891493
}
14901494
}
@@ -2051,6 +2055,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
20512055
bool AllOutputsFinished = true;
20522056
ERunStatus LastRunStatus = ERunStatus::PendingInput;
20532057
bool LastPopReturnedNoData = false;
2058+
bool IsFull = false;
20542059
};
20552060
TProcessOutputsState ProcessOutputsState;
20562061

0 commit comments

Comments
 (0)