Skip to content

Commit d3ff3d4

Browse files
authored
Merge 94b8112 into 4fef0c2
2 parents 4fef0c2 + 94b8112 commit d3ff3d4

File tree

3 files changed

+86
-28
lines changed

3 files changed

+86
-28
lines changed

ydb/library/yql/providers/dq/actors/actors_ut.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/yql/providers/dq/actors/events.h>
66

77
#include "result_receiver.h"
8+
#include "result_actor_base.h"
89

910
using namespace NActors;
1011
using namespace NYql;
@@ -33,8 +34,8 @@ Y_UNIT_TEST(ReceiveStatus) {
3334
runtime.Initialize();
3435

3536
auto sender = runtime.AllocateEdgeActor();
36-
auto receiverId = runtime.Register(ResultReceiver().Release());
37-
runtime.Send(new IEventHandle(receiverId, sender, new TEvReadyState(), 0, true));
37+
auto receiverId = runtime.Register(ResultReceiver().Release());
38+
runtime.Send(new IEventHandle(receiverId, sender, new TEvReadyState(), 0, true));
3839
}
3940

4041
Y_UNIT_TEST(ReceiveError) {
@@ -49,5 +50,23 @@ Y_UNIT_TEST(ReceiveError) {
4950
UNIT_ASSERT_EQUAL(response->Record.GetStatusCode(), NYql::NDqProto::StatusIds::UNAVAILABLE);
5051
}
5152

52-
} // Y_UNIT_TEST_SUITE(ResultReceiver)
53+
Y_UNIT_TEST(WriteQueue) {
54+
NYql::NDqs::NExecutionHelpers::TWriteQueue q;
55+
UNIT_ASSERT(q.empty());
5356

57+
NYql::NDqs::NExecutionHelpers::TQueueItem item({}, ""); item.Size = 1000;
58+
q.emplace(item);
59+
UNIT_ASSERT_EQUAL(q.ByteSize, 1000);
60+
61+
item.Size = 11;
62+
q.emplace(item);
63+
UNIT_ASSERT_EQUAL(q.ByteSize, 1011);
64+
65+
q.pop();
66+
UNIT_ASSERT_EQUAL(q.ByteSize, 11);
67+
68+
q.pop();
69+
UNIT_ASSERT_EQUAL(q.ByteSize, 0);
70+
}
71+
72+
} // Y_UNIT_TEST_SUITE(ResultReceiver)

ydb/library/yql/providers/dq/actors/result_actor_base.h

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,64 @@
1616

1717
namespace NYql::NDqs::NExecutionHelpers {
1818

19+
struct TQueueItem {
20+
TQueueItem(NDq::TDqSerializedBatch&& data, const TString& messageId)
21+
: Data(std::move(data))
22+
, MessageId(messageId)
23+
, SentProcessedEvent(false)
24+
, IsFinal(false)
25+
, Size(Data.Size())
26+
{
27+
}
28+
29+
static TQueueItem Final() {
30+
TQueueItem item({}, "FinalMessage");
31+
item.SentProcessedEvent = true;
32+
item.IsFinal = true;
33+
return item;
34+
}
35+
36+
NDq::TDqSerializedBatch Data;
37+
const TString MessageId;
38+
bool SentProcessedEvent = false;
39+
bool IsFinal = false;
40+
ui64 Size = 0;
41+
};
42+
43+
struct TWriteQueue {
44+
TQueue<TQueueItem> Queue;
45+
ui64 ByteSize = 0;
46+
47+
template< class... Args >
48+
decltype(auto) emplace( Args&&... args) {
49+
Queue.emplace(std::forward<Args>(args)...);
50+
ByteSize += Queue.back().Size;
51+
}
52+
53+
auto& front() {
54+
return Queue.front();
55+
}
56+
57+
auto& back() {
58+
return Queue.back();
59+
}
60+
61+
auto pop() {
62+
YQL_ENSURE(ByteSize >= Queue.front().Size);
63+
ByteSize -= Queue.front().Size;
64+
return Queue.pop();
65+
}
66+
67+
auto empty() const {
68+
return Queue.empty();
69+
}
70+
71+
void clear() {
72+
Queue.clear();
73+
ByteSize = 0;
74+
}
75+
};
76+
1977
template <class TDerived>
2078
class TResultActorBase : public NYql::TSynchronizableRichActor<TDerived>, public NYql::TCounters {
2179
protected:
@@ -180,6 +238,10 @@ namespace NYql::NDqs::NExecutionHelpers {
180238
}
181239
}
182240

241+
ui64 InflightBytes() {
242+
return WriteQueue.ByteSize;
243+
}
244+
183245
private:
184246
void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
185247
YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
@@ -349,29 +411,6 @@ namespace NYql::NDqs::NExecutionHelpers {
349411
TBase::Send(FullResultWriterID, std::move(req));
350412
}
351413

352-
private:
353-
struct TQueueItem {
354-
TQueueItem(NDq::TDqSerializedBatch&& data, const TString& messageId)
355-
: Data(std::move(data))
356-
, MessageId(messageId)
357-
, SentProcessedEvent(false)
358-
, IsFinal(false)
359-
{
360-
}
361-
362-
static TQueueItem Final() {
363-
TQueueItem item({}, "FinalMessage");
364-
item.SentProcessedEvent = true;
365-
item.IsFinal = true;
366-
return item;
367-
}
368-
369-
NDq::TDqSerializedBatch Data;
370-
const TString MessageId;
371-
bool SentProcessedEvent = false;
372-
bool IsFinal = false;
373-
};
374-
375414
protected:
376415
const NActors::TActorId ExecuterID;
377416
const TString TraceId;
@@ -383,7 +422,7 @@ namespace NYql::NDqs::NExecutionHelpers {
383422
const bool FullResultTableEnabled;
384423
const NActors::TActorId GraphExecutionEventsId;
385424
const bool Discard;
386-
TQueue<TQueueItem> WriteQueue;
425+
TWriteQueue WriteQueue;
387426
ui64 SizeLimit;
388427
TMaybe<ui64> RowsLimit;
389428
ui64 Rows;

ydb/library/yql/providers/dq/actors/result_receiver.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class TResultReceiver: public NYql::NDqs::NExecutionHelpers::TResultActorBase<TR
103103
auto req = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
104104
req->Record.SetChannelId(message->Get()->Record.GetChannelData().GetChannelId());
105105
req->Record.SetSeqNo(message->Get()->Record.GetSeqNo());
106-
req->Record.SetFreeSpace(256_MB);
106+
req->Record.SetFreeSpace((i64)256_MB - (i64)InflightBytes());
107107
req->Record.SetFinish(EarlyFinish); // set if premature finish started (when response limit reached and FullResultTable not enabled)
108108

109109
Send(message->Sender, req.Release());

0 commit comments

Comments
 (0)