@@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
2424struct TProducerState {
2525 TMaybe<ui64> LastSeqNo;
2626 ui64 AckedFreeSpaceBytes = 0 ;
27+ TActorId ActorId;
2728};
2829
2930class TRpcFlowControlState {
@@ -244,8 +245,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
244245 const auto traceId = Request_->GetTraceId ();
245246
246247 NYql::TIssues issues;
247- NKikimrKqp::EQueryAction queryAction;
248- if (!ParseQueryAction (*req, queryAction, issues)) {
248+ if (!ParseQueryAction (*req, QueryAction, issues)) {
249249 return ReplyFinishStream (Ydb::StatusIds::BAD_REQUEST, std::move (issues));
250250 }
251251
@@ -274,7 +274,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
274274 cachePolicy->set_keep_in_cache (true );
275275
276276 auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
277- queryAction ,
277+ QueryAction ,
278278 queryType,
279279 SelfId (),
280280 Request_,
@@ -288,7 +288,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
288288 nullptr , // operationParams
289289 false , // keepSession
290290 false , // useCancelAfter
291- syntax);
291+ syntax,
292+ true );
292293
293294 if (!ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release ())) {
294295 NYql::TIssues issues;
@@ -322,23 +323,24 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
322323
323324 ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes ();
324325
325- for (auto & pair : StreamProducers_ ) {
326- const auto & producerId = pair.first ;
327- auto & producer = pair.second ;
326+ for (auto & pair : StreamChannels_ ) {
327+ const auto & channelId = pair.first ;
328+ auto & channel = pair.second ;
328329
329- if (freeSpaceBytes > 0 && producer .LastSeqNo && producer .AckedFreeSpaceBytes == 0 ) {
330+ if (freeSpaceBytes > 0 && channel .LastSeqNo && channel .AckedFreeSpaceBytes == 0 ) {
330331 LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Resume execution, "
331- << " , producer : " << producerId
332- << " , seqNo: " << producer .LastSeqNo
332+ << " , channel : " << channelId
333+ << " , seqNo: " << channel .LastSeqNo
333334 << " , freeSpace: " << freeSpaceBytes);
334335
335336 auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
336- resp->Record .SetSeqNo (*producer .LastSeqNo );
337+ resp->Record .SetSeqNo (*channel .LastSeqNo );
337338 resp->Record .SetFreeSpace (freeSpaceBytes);
339+ resp->Record .SetChannelId (channelId);
338340
339- ctx.Send (producerId , resp.Release ());
341+ ctx.Send (channel. ActorId , resp.Release ());
340342
341- producer .AckedFreeSpaceBytes = freeSpaceBytes;
343+ channel .AckedFreeSpaceBytes = freeSpaceBytes;
342344 }
343345 }
344346
@@ -358,9 +360,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
358360
359361 Request_->SendSerializedResult (std::move (out), Ydb::StatusIds::SUCCESS);
360362
361- auto & producer = StreamProducers_[ev->Sender ];
362- producer.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
363- producer.AckedFreeSpaceBytes = freeSpaceBytes;
363+ auto & channel = StreamChannels_[ev->Get ()->Record .GetChannelId ()];
364+ channel.ActorId = ev->Sender ;
365+ channel.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
366+ channel.AckedFreeSpaceBytes = freeSpaceBytes;
364367
365368 LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Send stream data ack"
366369 << " , seqNo: " << ev->Get ()->Record .GetSeqNo ()
@@ -371,8 +374,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
371374 auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372375 resp->Record .SetSeqNo (ev->Get ()->Record .GetSeqNo ());
373376 resp->Record .SetFreeSpace (freeSpaceBytes);
377+ resp->Record .SetChannelId (ev->Get ()->Record .GetChannelId ());
374378
375- ctx.Send (ev-> Sender , resp.Release ());
379+ ctx.Send (channel. ActorId , resp.Release ());
376380 }
377381
378382 void Handle (NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
@@ -381,14 +385,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
381385 const auto & issueMessage = record.GetResponse ().GetQueryIssues ();
382386
383387 bool hasTrailingMessage = false ;
384-
388+
389+ auto & kqpResponse = record.GetResponse ();
390+ if (kqpResponse.GetYdbResults ().size () > 1 ) {
391+ auto issue = MakeIssue (NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
392+ " Unexpected trailing message with multiple result sets." );
393+ ReplyFinishStream (Ydb::StatusIds::INTERNAL_ERROR, issue);
394+ return ;
395+ }
396+
385397 if (record.GetYdbStatus () == Ydb::StatusIds::SUCCESS) {
386398 Request_->SetRuHeader (record.GetConsumedRu ());
387399
388400 auto & kqpResponse = record.GetResponse ();
389401
390402 Ydb::Query::ExecuteQueryResponsePart response;
391403
404+ if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
405+ for (int i = 0 ; i < kqpResponse.GetYdbResults ().size (); i++) {
406+ hasTrailingMessage = true ;
407+ response.set_result_set_index (i);
408+ response.mutable_result_set ()->Swap (record.MutableResponse ()->MutableYdbResults (i));
409+ }
410+ }
411+
392412 AuditContextAppend (Request_.get (), *Request_->GetProtoRequest (), response);
393413
394414 if (kqpResponse.HasTxMeta ()) {
@@ -492,8 +512,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
492512private:
493513 std::shared_ptr<TEvExecuteQueryRequest> Request_;
494514
515+ NKikimrKqp::EQueryAction QueryAction;
495516 TRpcFlowControlState FlowControl_;
496- TMap<TActorId , TProducerState> StreamProducers_ ;
517+ TMap<ui64 , TProducerState> StreamChannels_ ;
497518};
498519
499520} // namespace
0 commit comments