16
16
17
17
namespace NYql ::NDqs::NExecutionHelpers {
18
18
19
+ struct TQueueItem {
20
+ TQueueItem (NDq::TDqSerializedBatch&& data, const TString& messageId)
21
+ : Data(std::move(data))
22
+ , MessageId(messageId)
23
+ , SentProcessedEvent(false )
24
+ , IsFinal(false )
25
+ , Size(Data.Size())
26
+ {
27
+ }
28
+
29
+ static TQueueItem Final () {
30
+ TQueueItem item ({}, " FinalMessage" );
31
+ item.SentProcessedEvent = true ;
32
+ item.IsFinal = true ;
33
+ return item;
34
+ }
35
+
36
+ NDq::TDqSerializedBatch Data;
37
+ const TString MessageId;
38
+ bool SentProcessedEvent = false ;
39
+ bool IsFinal = false ;
40
+ ui64 Size = 0 ;
41
+ };
42
+
43
+ struct TWriteQueue {
44
+ TQueue<TQueueItem> Queue;
45
+ ui64 ByteSize = 0 ;
46
+
47
+ template < class ... Args >
48
+ decltype (auto ) emplace( Args&&... args) {
49
+ Queue.emplace (std::forward<Args>(args)...);
50
+ ByteSize += Queue.back ().Size ;
51
+ }
52
+
53
+ auto & front () {
54
+ return Queue.front ();
55
+ }
56
+
57
+ auto & back () {
58
+ return Queue.back ();
59
+ }
60
+
61
+ auto pop () {
62
+ ByteSize -= Queue.front ().Size ;
63
+ return Queue.pop ();
64
+ }
65
+
66
+ auto empty () const {
67
+ return Queue.empty ();
68
+ }
69
+
70
+ void clear () {
71
+ Queue.clear ();
72
+ ByteSize = 0 ;
73
+ }
74
+ };
75
+
19
76
template <class TDerived >
20
77
class TResultActorBase : public NYql ::TSynchronizableRichActor<TDerived>, public NYql::TCounters {
21
78
protected:
@@ -76,7 +133,6 @@ namespace NYql::NDqs::NExecutionHelpers {
76
133
}
77
134
78
135
WriteQueue.emplace (std::move (data), messageId);
79
- InflightBytes += WriteQueue.back ().Size ;
80
136
if (FullResultTableEnabled && FullResultWriterID) {
81
137
TryWriteToFullResultTable ();
82
138
} else {
@@ -181,6 +237,10 @@ namespace NYql::NDqs::NExecutionHelpers {
181
237
}
182
238
}
183
239
240
+ ui64 InflightBytes () {
241
+ return WriteQueue.ByteSize ;
242
+ }
243
+
184
244
private:
185
245
void OnQueryResult (TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
186
246
YQL_LOG_CTX_ROOT_SESSION_SCOPE (TraceId);
@@ -215,7 +275,6 @@ namespace NYql::NDqs::NExecutionHelpers {
215
275
} else {
216
276
WaitingAckFromFRW = false ;
217
277
WriteQueue.clear ();
218
- InflightBytes = 0 ;
219
278
Y_ABORT_UNLESS (ev->Get ()->Record .GetStatusCode () != NYql::NDqProto::StatusIds::SUCCESS);
220
279
TBase::Send (ExecuterID, ev->Release ().Release ());
221
280
}
@@ -236,7 +295,6 @@ namespace NYql::NDqs::NExecutionHelpers {
236
295
if (!WriteQueue.front ().SentProcessedEvent ) { // messages, received before limits exceeded, are already been reported
237
296
TBase::Send (TBase::SelfId (), MakeHolder<TEvMessageProcessed>(WriteQueue.front ().MessageId ));
238
297
}
239
- InflightBytes -= WriteQueue.back ().Size ;
240
298
WriteQueue.pop ();
241
299
242
300
if (WriteQueue.empty ()) {
@@ -352,44 +410,18 @@ namespace NYql::NDqs::NExecutionHelpers {
352
410
TBase::Send (FullResultWriterID, std::move (req));
353
411
}
354
412
355
- private:
356
- struct TQueueItem {
357
- TQueueItem (NDq::TDqSerializedBatch&& data, const TString& messageId)
358
- : Data(std::move(data))
359
- , MessageId(messageId)
360
- , SentProcessedEvent(false )
361
- , IsFinal(false )
362
- , Size(Data.Size())
363
- {
364
- }
365
-
366
- static TQueueItem Final () {
367
- TQueueItem item ({}, " FinalMessage" );
368
- item.SentProcessedEvent = true ;
369
- item.IsFinal = true ;
370
- return item;
371
- }
372
-
373
- NDq::TDqSerializedBatch Data;
374
- const TString MessageId;
375
- bool SentProcessedEvent = false ;
376
- bool IsFinal = false ;
377
- ui64 Size = 0 ;
378
- };
379
-
380
413
protected:
381
414
const NActors::TActorId ExecuterID;
382
415
const TString TraceId;
383
416
TDqConfiguration::TPtr Settings;
384
417
bool FinishCalled;
385
418
bool EarlyFinish;
386
- ui64 InflightBytes = 0 ;
387
419
388
420
private:
389
421
const bool FullResultTableEnabled;
390
422
const NActors::TActorId GraphExecutionEventsId;
391
423
const bool Discard;
392
- TQueue<TQueueItem> WriteQueue;
424
+ TWriteQueue WriteQueue;
393
425
ui64 SizeLimit;
394
426
TMaybe<ui64> RowsLimit;
395
427
ui64 Rows;
0 commit comments