Skip to content

Commit 99c5efb

Browse files
authored
Fix memory consumption restriction in stream RPC calls (#10758)
1 parent 965a1e5 commit 99c5efb

File tree

4 files changed

+105
-117
lines changed

4 files changed

+105
-117
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#pragma once
2+
#include <util/generic/queue.h>
3+
#include <util/generic/yexception.h>
4+
#include <util/system/types.h>
5+
6+
namespace NKikimr::NGRpcService {
7+
8+
class TRpcFlowControlState {
9+
public:
10+
TRpcFlowControlState(ui64 inflightLimitBytes)
11+
: InflightLimitBytes_(inflightLimitBytes) {}
12+
13+
void PushResponse(ui64 responseSizeBytes) {
14+
ResponseSizeQueue_.push(responseSizeBytes);
15+
TotalResponsesSize_ += responseSizeBytes;
16+
}
17+
18+
void PopResponse() {
19+
Y_ENSURE(!ResponseSizeQueue_.empty());
20+
TotalResponsesSize_ -= ResponseSizeQueue_.front();
21+
ResponseSizeQueue_.pop();
22+
}
23+
24+
size_t QueueSize() const {
25+
return ResponseSizeQueue_.size();
26+
}
27+
28+
i64 FreeSpaceBytes() const { // Negative value temporarily stops data evaluation in DQ graph
29+
return static_cast<i64>(InflightLimitBytes_) - static_cast<i64>(TotalResponsesSize_);
30+
}
31+
32+
ui64 InflightBytes() const {
33+
return TotalResponsesSize_;
34+
}
35+
36+
ui64 InflightLimitBytes() const {
37+
return InflightLimitBytes_;
38+
}
39+
40+
private:
41+
const ui64 InflightLimitBytes_;
42+
43+
TQueue<ui64> ResponseSizeQueue_;
44+
ui64 TotalResponsesSize_ = 0;
45+
};
46+
47+
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#include "service_query.h"
2-
32
#include <ydb/core/actorlib_impl/long_timer.h>
43
#include <ydb/core/base/appdata.h>
54
#include <ydb/core/grpc_services/audit_dml_operations.h>
65
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/core/grpc_services/base/flow_control.h>
77
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
88
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
99
#include <ydb/core/grpc_services/rpc_kqp_base.h>
@@ -24,51 +24,10 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
2424

2525
struct TProducerState {
2626
TMaybe<ui64> LastSeqNo;
27-
ui64 AckedFreeSpaceBytes = 0;
27+
i64 AckedFreeSpaceBytes = 0;
2828
TActorId ActorId;
2929
};
3030

31-
class TRpcFlowControlState {
32-
public:
33-
TRpcFlowControlState(ui64 inflightLimitBytes)
34-
: InflightLimitBytes_(inflightLimitBytes) {}
35-
36-
void PushResponse(ui64 responseSizeBytes) {
37-
ResponseSizeQueue_.push(responseSizeBytes);
38-
TotalResponsesSize_ += responseSizeBytes;
39-
}
40-
41-
void PopResponse() {
42-
Y_ENSURE(!ResponseSizeQueue_.empty());
43-
TotalResponsesSize_ -= ResponseSizeQueue_.front();
44-
ResponseSizeQueue_.pop();
45-
}
46-
47-
size_t QueueSize() const {
48-
return ResponseSizeQueue_.size();
49-
}
50-
51-
ui64 FreeSpaceBytes() const {
52-
return TotalResponsesSize_ < InflightLimitBytes_
53-
? InflightLimitBytes_ - TotalResponsesSize_
54-
: 0;
55-
}
56-
57-
ui64 InflightBytes() const {
58-
return TotalResponsesSize_;
59-
}
60-
61-
ui64 InflightLimitBytes() const {
62-
return InflightLimitBytes_;
63-
}
64-
65-
private:
66-
const ui64 InflightLimitBytes_;
67-
68-
TQueue<ui64> ResponseSizeQueue_;
69-
ui64 TotalResponsesSize_ = 0;
70-
};
71-
7231
bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to,
7332
NYql::TIssues& issues)
7433
{
@@ -328,13 +287,13 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
328287
FlowControl_.PopResponse();
329288
}
330289

331-
ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
290+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
332291

333292
for (auto& pair : StreamChannels_) {
334293
const auto& channelId = pair.first;
335294
auto& channel = pair.second;
336295

337-
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) {
296+
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) {
338297
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
339298
<< ", channel: " << channelId
340299
<< ", seqNo: " << channel.LastSeqNo
@@ -363,7 +322,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
363322
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
364323

365324
FlowControl_.PushResponse(out.size());
366-
auto freeSpaceBytes = FlowControl_.FreeSpaceBytes();
325+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
367326

368327
Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
369328

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "service_table.h"
22
#include <ydb/core/grpc_services/base/base.h>
3+
#include <ydb/core/grpc_services/base/flow_control.h>
34

45
#include "rpc_kqp_base.h"
56
#include "service_table.h"
@@ -154,7 +155,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
154155

155156
TStreamExecuteScanQueryRPC(TEvStreamExecuteScanQueryRequest* request, ui64 rpcBufferSize)
156157
: Request_(request)
157-
, RpcBufferSize_(rpcBufferSize) {}
158+
, FlowControl_(rpcBufferSize) {}
158159

159160
void Bootstrap(const TActorContext &ctx) {
160161
this->Become(&TStreamExecuteScanQueryRPC::StateWork);
@@ -249,32 +250,30 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
249250
void Handle(TRpcServices::TEvGrpcNextReply::TPtr& ev, const TActorContext& ctx) {
250251
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " NextReply"
251252
<< ", left: " << ev->Get()->LeftInQueue
252-
<< ", queue: " << GRpcResponsesSizeQueue_.size()
253-
<< ", used memory: " << GRpcResponsesSize_
254-
<< ", buffer size: " << RpcBufferSize_);
253+
<< ", queue: " << FlowControl_.QueueSize()
254+
<< ", inflight bytes: " << FlowControl_.InflightBytes()
255+
<< ", limit bytes: " << FlowControl_.InflightLimitBytes());
255256

256-
while (GRpcResponsesSizeQueue_.size() > ev->Get()->LeftInQueue) {
257-
GRpcResponsesSize_ -= GRpcResponsesSizeQueue_.front();
258-
GRpcResponsesSizeQueue_.pop();
257+
while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) {
258+
FlowControl_.PopResponse();
259259
}
260-
Y_DEBUG_ABORT_UNLESS(GRpcResponsesSizeQueue_.empty() == (GRpcResponsesSize_ == 0));
261-
LastDataStreamTimestamp_ = TAppData::TimeProvider->Now();
262260

263-
if (WaitOnSeqNo_ && RpcBufferSize_ > GRpcResponsesSize_) {
264-
ui64 freeSpace = RpcBufferSize_ - GRpcResponsesSize_;
261+
LastDataStreamTimestamp_ = TAppData::TimeProvider->Now();
265262

263+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
264+
if (freeSpaceBytes > 0 && LastSeqNo_ && AckedFreeSpaceBytes_ <= 0) {
266265
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
267-
<< ", seqNo: " << *WaitOnSeqNo_
268-
<< ", freeSpace: " << freeSpace
266+
<< ", seqNo: " << *LastSeqNo_
267+
<< ", freeSpace: " << freeSpaceBytes
269268
<< ", to: " << ExecuterActorId_);
270269

271270
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
272-
resp->Record.SetSeqNo(*WaitOnSeqNo_);
273-
resp->Record.SetFreeSpace(freeSpace);
271+
resp->Record.SetSeqNo(*LastSeqNo_);
272+
resp->Record.SetFreeSpace(freeSpaceBytes);
274273

275274
ctx.Send(ExecuterActorId_, resp.Release());
276275

277-
WaitOnSeqNo_.Clear();
276+
AckedFreeSpaceBytes_ = freeSpaceBytes;
278277
}
279278
}
280279

@@ -348,28 +347,22 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
348347
TString out;
349348
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
350349

351-
GRpcResponsesSizeQueue_.push(out.size());
352-
GRpcResponsesSize_ += out.size();
350+
FlowControl_.PushResponse(out.size());
351+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
352+
LastSeqNo_ = ev->Get()->Record.GetSeqNo();
353+
AckedFreeSpaceBytes_ = freeSpaceBytes;
353354

354355
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
355356

356-
ui64 freeSpace = GRpcResponsesSize_ < RpcBufferSize_
357-
? RpcBufferSize_ - GRpcResponsesSize_
358-
: 0;
359-
360-
if (freeSpace == 0) {
361-
WaitOnSeqNo_ = ev->Get()->Record.GetSeqNo();
362-
}
363-
364357
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
365358
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
366-
<< ", freeSpace: " << freeSpace
359+
<< ", freeSpace: " << freeSpaceBytes
367360
<< ", to: " << ev->Sender
368-
<< ", queue: " << GRpcResponsesSizeQueue_.size());
361+
<< ", queue: " << FlowControl_.QueueSize());
369362

370363
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
371364
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
372-
resp->Record.SetFreeSpace(freeSpace);
365+
resp->Record.SetFreeSpace(freeSpaceBytes);
373366

374367
ctx.Send(ev->Sender, resp.Release());
375368
}
@@ -410,9 +403,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
410403
TInstant now = TAppData::TimeProvider->Now();
411404
TDuration timeout;
412405
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Got timeout event, InactiveClientTimeout: " << InactiveClientTimeout_
413-
<< " GRpcResponsesSizeQueue: " << GRpcResponsesSizeQueue_.size());
406+
<< " GRpcResponsesSizeQueue: " << FlowControl_.QueueSize());
414407

415-
if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) {
408+
if (InactiveClientTimeout_ && FlowControl_.QueueSize() > 0) {
416409
TDuration processTime = now - LastDataStreamTimestamp_;
417410
if (processTime >= InactiveClientTimeout_) {
418411
auto message = TStringBuilder() << this->SelfId() << " Client cannot process data in " << processTime
@@ -476,13 +469,12 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
476469

477470
private:
478471
std::shared_ptr<TEvStreamExecuteScanQueryRequest> Request_;
479-
const ui64 RpcBufferSize_;
472+
TRpcFlowControlState FlowControl_;
473+
TMaybe<ui64> LastSeqNo_;
474+
i64 AckedFreeSpaceBytes_ = 0;
480475

481476
TDuration InactiveClientTimeout_;
482-
TQueue<ui64> GRpcResponsesSizeQueue_;
483-
ui64 GRpcResponsesSize_ = 0;
484477
TInstant LastDataStreamTimestamp_;
485-
TMaybe<ui64> WaitOnSeqNo_;
486478

487479
TSchedulerCookieHolder TimeoutTimerCookieHolder_;
488480

0 commit comments

Comments
 (0)