Skip to content

Commit 27f35fa

Browse files
authored
Fix flow control in run script actor (#11241)
1 parent c2aaa12 commit 27f35fa

File tree

3 files changed

+151
-101
lines changed

3 files changed

+151
-101
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,25 @@ struct TProducerState {
2828
TMaybe<ui64> LastSeqNo;
2929
i64 AckedFreeSpaceBytes = 0;
3030
TActorId ActorId;
31+
ui64 ChannelId = 0;
32+
33+
void SendAck(const NActors::TActorIdentity& actor) const {
34+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
35+
resp->Record.SetSeqNo(*LastSeqNo);
36+
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
37+
resp->Record.SetChannelId(ChannelId);
38+
39+
actor.Send(ActorId, resp.Release());
40+
}
41+
42+
bool ResumeIfStopped(const NActors::TActorIdentity& actor, i64 freeSpaceBytes) {
43+
if (LastSeqNo && AckedFreeSpaceBytes <= 0) {
44+
AckedFreeSpaceBytes = freeSpaceBytes;
45+
SendAck(actor);
46+
return true;
47+
}
48+
return false;
49+
}
3150
};
3251

3352
bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to,
@@ -292,28 +311,16 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
292311
}
293312

294313
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
295-
296-
for (auto& pair : StreamChannels_) {
297-
const auto& channelId = pair.first;
298-
auto& channel = pair.second;
299-
300-
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) {
301-
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
302-
<< ", channel: " << channelId
303-
<< ", seqNo: " << channel.LastSeqNo
304-
<< ", freeSpace: " << freeSpaceBytes);
305-
306-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
307-
resp->Record.SetSeqNo(*channel.LastSeqNo);
308-
resp->Record.SetFreeSpace(freeSpaceBytes);
309-
resp->Record.SetChannelId(channelId);
310-
311-
ctx.Send(channel.ActorId, resp.Release());
312-
313-
channel.AckedFreeSpaceBytes = freeSpaceBytes;
314+
if (freeSpaceBytes > 0) {
315+
for (auto& [channelId, channel] : StreamChannels_) {
316+
if (channel.ResumeIfStopped(SelfId(), freeSpaceBytes)) {
317+
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
318+
<< ", channel: " << channelId
319+
<< ", seqNo: " << channel.LastSeqNo
320+
<< ", freeSpace: " << freeSpaceBytes);
321+
}
314322
}
315323
}
316-
317324
}
318325

319326
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
@@ -334,19 +341,15 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
334341
channel.ActorId = ev->Sender;
335342
channel.LastSeqNo = ev->Get()->Record.GetSeqNo();
336343
channel.AckedFreeSpaceBytes = freeSpaceBytes;
344+
channel.ChannelId = ev->Get()->Record.GetChannelId();
337345

338346
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
339347
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
340348
<< ", freeSpace: " << freeSpaceBytes
341349
<< ", to: " << ev->Sender
342350
<< ", queue: " << FlowControl_.QueueSize());
343351

344-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
345-
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
346-
resp->Record.SetFreeSpace(freeSpaceBytes);
347-
resp->Record.SetChannelId(ev->Get()->Record.GetChannelId());
348-
349-
ctx.Send(channel.ActorId, resp.Release());
352+
channel.SendAck(SelfId());
350353
}
351354

352355
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
@@ -489,7 +492,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
489492
NKikimrKqp::EQueryAction QueryAction;
490493
TRpcFlowControlState FlowControl_;
491494
TMap<ui64, TProducerState> StreamChannels_;
492-
495+
493496
NWilson::TSpan Span_;
494497
};
495498

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10321032
void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
10331033
if (!ExecutionEntryExists && status == Ydb::StatusIds::SUCCESS) {
10341034
status = Ydb::StatusIds::NOT_FOUND;
1035-
issues.AddIssue("No such execution");
1035+
issues.AddIssue("No such execution");
10361036
}
10371037

10381038
if (status == Ydb::StatusIds::SUCCESS) {
@@ -1796,26 +1796,26 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17961796
.AddParam("$items");
17971797

17981798
param
1799-
.BeginList();
1799+
.BeginList();
18001800

18011801
auto row = FirstRow;
18021802
for (const auto& rowValue : ResultSet.rows()) {
18031803
auto rowValueSerialized = rowValue.SerializeAsString();
18041804
SavedSize += rowValueSerialized.size();
18051805
param
1806-
.AddListItem()
1807-
.BeginStruct()
1808-
.AddMember("row_id")
1809-
.Int64(row++)
1810-
.AddMember("result_set")
1811-
.String(std::move(rowValueSerialized))
1812-
.AddMember("accumulated_size")
1813-
.Int64(AccumulatedSize + SavedSize)
1814-
.EndStruct();
1806+
.AddListItem()
1807+
.BeginStruct()
1808+
.AddMember("row_id")
1809+
.Int64(row++)
1810+
.AddMember("result_set")
1811+
.String(std::move(rowValueSerialized))
1812+
.AddMember("accumulated_size")
1813+
.Int64(AccumulatedSize + SavedSize)
1814+
.EndStruct();
18151815
}
18161816
param
1817-
.EndList()
1818-
.Build();
1817+
.EndList()
1818+
.Build();
18191819

18201820
RunDataQuery(sql, &params);
18211821
}

0 commit comments

Comments
 (0)