Skip to content

Commit 744cd65

Browse files
authored
Merge 862eeac into e9b6136
2 parents e9b6136 + 862eeac commit 744cd65

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

ydb/library/yql/providers/dq/actors/result_actor_base.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ namespace NYql::NDqs::NExecutionHelpers {
7676
}
7777

7878
WriteQueue.emplace(std::move(data), messageId);
79+
InflightBytes += WriteQueue.back().Size;
7980
if (FullResultTableEnabled && FullResultWriterID) {
8081
TryWriteToFullResultTable();
8182
} else {
@@ -214,6 +215,7 @@ namespace NYql::NDqs::NExecutionHelpers {
214215
} else {
215216
WaitingAckFromFRW = false;
216217
WriteQueue.clear();
218+
InflightBytes = 0;
217219
Y_ABORT_UNLESS(ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS);
218220
TBase::Send(ExecuterID, ev->Release().Release());
219221
}
@@ -234,6 +236,7 @@ namespace NYql::NDqs::NExecutionHelpers {
234236
if (!WriteQueue.front().SentProcessedEvent) { // messages, received before limits exceeded, are already been reported
235237
TBase::Send(TBase::SelfId(), MakeHolder<TEvMessageProcessed>(WriteQueue.front().MessageId));
236238
}
239+
InflightBytes -= WriteQueue.back().Size;
237240
WriteQueue.pop();
238241

239242
if (WriteQueue.empty()) {
@@ -356,6 +359,7 @@ namespace NYql::NDqs::NExecutionHelpers {
356359
, MessageId(messageId)
357360
, SentProcessedEvent(false)
358361
, IsFinal(false)
362+
, Size(Data.Size())
359363
{
360364
}
361365

@@ -370,6 +374,7 @@ namespace NYql::NDqs::NExecutionHelpers {
370374
const TString MessageId;
371375
bool SentProcessedEvent = false;
372376
bool IsFinal = false;
377+
ui64 Size = 0;
373378
};
374379

375380
protected:
@@ -378,6 +383,7 @@ namespace NYql::NDqs::NExecutionHelpers {
378383
TDqConfiguration::TPtr Settings;
379384
bool FinishCalled;
380385
bool EarlyFinish;
386+
ui64 InflightBytes = 0;
381387

382388
private:
383389
const bool FullResultTableEnabled;

ydb/library/yql/providers/dq/actors/result_receiver.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class TResultReceiver: public NYql::NDqs::NExecutionHelpers::TResultActorBase<TR
103103
auto req = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
104104
req->Record.SetChannelId(message->Get()->Record.GetChannelData().GetChannelId());
105105
req->Record.SetSeqNo(message->Get()->Record.GetSeqNo());
106-
req->Record.SetFreeSpace(256_MB);
106+
req->Record.SetFreeSpace(256_MB - InflightBytes);
107107
req->Record.SetFinish(EarlyFinish); // set if premature finish started (when response limit reached and FullResultTable not enabled)
108108

109109
Send(message->Sender, req.Release());

0 commit comments

Comments
 (0)