@@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
4040 , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
4141 , NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
4242 , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
43+ , LookupStrategy(settings.GetLookupStrategy())
4344 , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
4445 , Counters(counters)
4546 , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), " LookupActor" )
@@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
6768 return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
6869 }
6970
70- void FillExtraStats (NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
71+ void FillExtraStats (NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats ) override {
7172 if (last) {
7273 NYql::NDqProto::TDqTableStats* tableStats = nullptr ;
7374 for (auto & table : *stats->MutableTables ()) {
@@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
8182 tableStats->SetTablePath (StreamLookupWorker->GetTablePath ());
8283 }
8384
85+ ui64 rowsReadEstimate = ReadRowsCount;
86+ ui64 bytesReadEstimate = ReadBytesCount;
87+
88+ if (mstats) {
89+ switch (LookupStrategy) {
90+ case NKqpProto::EStreamLookupStrategy::LOOKUP: {
91+ // in lookup case we return as result actual data, that we read from the datashard.
92+ rowsReadEstimate = mstats->Inputs [InputIndex]->RowsConsumed ;
93+ bytesReadEstimate = mstats->Inputs [InputIndex]->BytesConsumed ;
94+ break ;
95+ }
96+ default :
97+ ;
98+ }
99+ }
100+
84101 // TODO: use evread statistics after KIKIMR-16924
85- tableStats->SetReadRows (tableStats->GetReadRows () + ReadRowsCount );
86- tableStats->SetReadBytes (tableStats->GetReadBytes () + ReadBytesCount );
102+ tableStats->SetReadRows (tableStats->GetReadRows () + rowsReadEstimate );
103+ tableStats->SetReadBytes (tableStats->GetReadBytes () + bytesReadEstimate );
87104 tableStats->SetAffectedPartitions (tableStats->GetAffectedPartitions () + ReadsPerShard.size ());
88105
89106 NKqpProto::TKqpTableExtraStats tableExtraStats;
@@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
148165 };
149166
150167 struct TEvRetryRead : public TEventLocal <TEvRetryRead, EvRetryRead> {
151- explicit TEvRetryRead (ui64 readId, ui64 lastSeqNo, bool instantStart = false )
168+ explicit TEvRetryRead (ui64 readId, ui64 lastSeqNo, bool instantStart = false )
152169 : ReadId(readId)
153170 , LastSeqNo(lastSeqNo)
154171 , InstantStart(instantStart) {
@@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
259276 void Handle (TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
260277 CA_LOG_D (" TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath ());
261278 if (ev->Get ()->Request ->ErrorCount > 0 ) {
262- TString errorMsg = TStringBuilder () << " Failed to get partitioning for table: "
279+ TString errorMsg = TStringBuilder () << " Failed to get partitioning for table: "
263280 << StreamLookupWorker->GetTablePath ();
264281 LookupActorStateSpan.EndError (errorMsg);
265282
@@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
419436 auto readIt = Reads.find (ev->Get ()->ReadId );
420437 YQL_ENSURE (readIt != Reads.end (), " Unexpected readId: " << ev->Get ()->ReadId );
421438 auto & read = readIt->second ;
422-
439+
423440 if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get ()->LastSeqNo ) {
424441 if (ev->Get ()->InstantStart ) {
425442 read.SetFinished ();
@@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
566583 keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
567584
568585 Counters->IteratorsShardResolve ->Inc ();
569- LookupActorStateSpan = NWilson::TSpan (TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId (),
586+ LookupActorStateSpan = NWilson::TSpan (TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId (),
570587 " WaitForShardsResolve" , NWilson::EFlags::AUTO_END);
571588
572589 Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvInvalidateTable (StreamLookupWorker->GetTableId (), {}));
@@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
625642 NActors::TActorId SchemeCacheRequestTimeoutTimer;
626643 TVector<NKikimrDataEvents::TLock> Locks;
627644 TVector<NKikimrDataEvents::TLock> BrokenLocks;
645+ NKqpProto::EStreamLookupStrategy LookupStrategy;
628646 std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
629647 ui64 ReadId = 0 ;
630648 size_t TotalRetryAttempts = 0 ;
0 commit comments