1111
1212#include < ydb/core/kqp/common/kqp_ru_calc.h>
1313#include < ydb/core/kqp/common/kqp_lwtrace_probes.h>
14+ #include < ydb/core/kqp/runtime/kqp_transport.h>
1415
1516#include < ydb/core/actorlib_impl/long_timer.h>
1617#include < ydb/core/base/appdata.h>
3435
3536#include < ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
3637#include < ydb/library/yql/dq/runtime/dq_transport.h>
38+ #include < ydb/library/yql/dq/common/dq_serialized_batch.h>
3739#include < ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
3840#include < ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
3941#include < ydb/library/yql/public/issue/yql_issue.h>
4547#include < ydb/library/actors/core/hfunc.h>
4648#include < ydb/library/actors/core/log.h>
4749
50+
4851#include < util/generic/size_literals.h>
4952
5053
@@ -119,7 +122,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
119122 const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
120123 const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
121124 TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
122- ui64 spanVerbosity = 0 , TString spanName = " KqpExecuterBase" )
125+ ui64 spanVerbosity = 0 , TString spanName = " KqpExecuterBase" , bool streamResult = false )
123126 : Request(std::move(request))
124127 , Database(database)
125128 , UserToken(userToken)
@@ -130,6 +133,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
130133 , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
131134 , AggregationSettings(aggregation)
132135 , HasOlapTable(false )
136+ , StreamResult(streamResult)
133137 {
134138 TasksGraph.GetMeta ().Snapshot = IKqpGateway::TKqpSnapshot (Request.Snapshot .Step , Request.Snapshot .TxId );
135139 TasksGraph.GetMeta ().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
@@ -234,6 +238,130 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
234238 return true ;
235239 }
236240
241+ struct TEvComputeChannelDataOOB {
242+ NYql::NDqProto::TEvComputeChannelData Proto;
243+ TRope Payload;
244+
245+ size_t Size () const {
246+ return Proto.GetChannelData ().GetData ().GetRaw ().size () + Payload.size ();
247+ }
248+
249+ ui32 RowCount () const {
250+ return Proto.GetChannelData ().GetData ().GetRows ();
251+ }
252+ };
253+
254+ void HandleChannelData (NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
255+ auto & record = ev->Get ()->Record ;
256+ auto & channelData = record.GetChannelData ();
257+ auto & channel = TasksGraph.GetChannel (channelData.GetChannelId ());
258+ auto & task = TasksGraph.GetTask (channel.SrcTask );
259+ const TActorId channelComputeActorId = ev->Sender ;
260+
261+ auto & txResult = ResponseEv->TxResults [channel.DstInputIndex ];
262+ ui32 queryResultIndex = txResult.QueryResultIndex .Defined () ? *txResult.QueryResultIndex : 0 ;
263+ auto [it, _] = ResultChannelToComputeActor.emplace (queryResultIndex, std::make_pair (ev->Sender , channel.Id ));
264+ YQL_ENSURE (it->second .first == channelComputeActorId);
265+
266+ if (StreamResult && txResult.IsStream && txResult.QueryResultIndex .Defined ()) {
267+
268+ TEvComputeChannelDataOOB computeData;
269+ computeData.Proto = std::move (ev->Get ()->Record );
270+ if (computeData.Proto .GetChannelData ().GetData ().HasPayloadId ()) {
271+ computeData.Payload = ev->Get ()->GetPayload (computeData.Proto .GetChannelData ().GetData ().GetPayloadId ());
272+ }
273+
274+ const bool trailingResults = (
275+ computeData.Proto .GetChannelData ().GetFinished () &&
276+ (ResponseEv->TxResults .size () == 1 ) &&
277+ Request.IsTrailingResultsAllowed ());
278+
279+ TVector<NYql::NDq::TDqSerializedBatch> batches (1 );
280+ auto & batch = batches.front ();
281+
282+ batch.Proto = std::move (*computeData.Proto .MutableChannelData ()->MutableData ());
283+ batch.Payload = std::move (computeData.Payload );
284+
285+ TKqpProtoBuilder protoBuilder{*AppData ()->FunctionRegistry };
286+ auto resultSet = protoBuilder.BuildYdbResultSet (std::move (batches), txResult.MkqlItemType , txResult.ColumnOrder );
287+
288+ if (!trailingResults) {
289+ auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
290+ streamEv->Record .SetSeqNo (computeData.Proto .GetSeqNo ());
291+ streamEv->Record .SetQueryResultIndex (*txResult.QueryResultIndex );
292+ streamEv->Record .MutableResultSet ()->Swap (&resultSet);
293+
294+ LOG_D (" Send TEvStreamData to " << Target << " , seqNo: " << streamEv->Record .GetSeqNo ()
295+ << " , nRows: " << streamEv->Record .GetResultSet ().rows ().size ());
296+
297+ this ->Send (Target, streamEv.Release ());
298+
299+ } else {
300+ auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
301+ ackEv->Record .SetSeqNo (computeData.Proto .GetSeqNo ());
302+ ackEv->Record .SetChannelId (channel.Id );
303+ ackEv->Record .SetFreeSpace (50_MB);
304+ this ->Send (channelComputeActorId, ackEv.Release (), /* TODO: undelivery */ 0 , /* cookie */ channel.Id );
305+ txResult.TrailingResult .Swap (&resultSet);
306+ txResult.HasTrailingResult = true ;
307+ LOG_D (" staging TEvStreamData to " << Target << " , seqNo: " << computeData.Proto .GetSeqNo ()
308+ << " , nRows: " << txResult.TrailingResult .rows ().size ());
309+ }
310+
311+ return ;
312+ }
313+
314+ NYql::NDq::TDqSerializedBatch batch;
315+ batch.Proto = std::move (*record.MutableChannelData ()->MutableData ());
316+ if (batch.Proto .HasPayloadId ()) {
317+ batch.Payload = ev->Get ()->GetPayload (batch.Proto .GetPayloadId ());
318+ }
319+
320+ YQL_ENSURE (channel.DstTask == 0 );
321+
322+ if (Stats) {
323+ Stats->ResultBytes += batch.Size ();
324+ Stats->ResultRows += batch.RowCount ();
325+ }
326+
327+ LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << task.Meta .ShardId
328+ << " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
329+ << " , finished: " << channelData.GetFinished ());
330+
331+ ResponseEv->TakeResult (channel.DstInputIndex , std::move (batch));
332+ LOG_T (" Send ack to channelId: " << channel.Id << " , seqNo: " << record.GetSeqNo () << " , to: " << ev->Sender );
333+
334+ auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
335+ ackEv->Record .SetSeqNo (record.GetSeqNo ());
336+ ackEv->Record .SetChannelId (channel.Id );
337+ ackEv->Record .SetFreeSpace (50_MB);
338+ this ->Send (channelComputeActorId, ackEv.Release (), /* TODO: undelivery */ 0 , /* cookie */ channel.Id );
339+ }
340+
341+ void HandleStreamAck (TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) {
342+ ui64 queryResultIndex = ev->Get ()->Record .GetQueryResultIndex ();
343+ auto it = ResultChannelToComputeActor.find (queryResultIndex);
344+ YQL_ENSURE (it != ResultChannelToComputeActor.end ());
345+ const auto [channelComputeActorId, channelId] = it->second ;
346+
347+ ui64 seqNo = ev->Get ()->Record .GetSeqNo ();
348+ i64 freeSpace = ev->Get ()->Record .GetFreeSpace ();
349+
350+ LOG_ERROR_S (*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, " TxId: " << TxId
351+ << " , send ack to channelId: " << channelId
352+ << " , seqNo: " << seqNo
353+ << " , enough: " << ev->Get ()->Record .GetEnough ()
354+ << " , freeSpace: " << freeSpace
355+ << " , to: " << channelComputeActorId);
356+
357+ auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
358+ ackEv->Record .SetSeqNo (seqNo);
359+ ackEv->Record .SetChannelId (channelId);
360+ ackEv->Record .SetFreeSpace (freeSpace);
361+ ackEv->Record .SetFinish (ev->Get ()->Record .GetEnough ());
362+ this ->Send (channelComputeActorId, ackEv.Release (), /* TODO: undelivery */ 0 , /* cookie */ channelId);
363+ }
364+
237365 void HandleComputeStats (NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
238366 TActorId computeActor = ev->Sender ;
239367 auto & state = ev->Get ()->Record ;
@@ -1594,16 +1722,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
15941722 return true ;
15951723 }
15961724
1597- void InitializeChannelProxies () {
1598- for (const auto & channel: TasksGraph.GetChannels ()) {
1599- if (channel.DstTask ) {
1600- continue ;
1601- }
1602-
1603- CreateChannelProxy (channel);
1604- }
1605- }
1606-
16071725 const IKqpGateway::TKqpSnapshot& GetSnapshot () const {
16081726 return TasksGraph.GetMeta ().Snapshot ;
16091727 }
@@ -1753,8 +1871,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17531871 const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
17541872 TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
17551873 bool HasOlapTable = false ;
1874+ bool StreamResult = false ;
17561875 bool HasDatashardSourceScan = false ;
17571876 bool UnknownAffectedShardCount = false ;
1877+
1878+ THashMap<ui64, std::pair<TActorId, ui64>> ResultChannelToComputeActor;
17581879 THashMap<NYql::NDq::TStageId, THashMap<ui64, TShardInfo>> SourceScanStageIdToParititions;
17591880
17601881private:
0 commit comments