@@ -18,9 +18,6 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
1818 Context->GetCommonContext ()->GetCounters ().OnSourceFinished (
1919 source->GetRecordsCount (), source->GetUsedRawBytes (), tableExt ? tableExt->num_rows () : 0 );
2020
21- if ((!tableExt || !tableExt->num_rows ()) && Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && InFlightLimit < MaxInFlight) {
22- InFlightLimit = 2 * InFlightLimit;
23- }
2421 source->MutableStageResult ().SetResultChunk (std::move (tableExt), startIndex, recordsCount);
2522 while (FetchingSources.size ()) {
2623 auto frontSource = FetchingSources.front ();
@@ -59,13 +56,18 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
5956 AFL_VERIFY (FetchingSourcesByIdx.erase (frontSource->GetSourceIdx ()));
6057 FetchingSources.pop_front ();
6158 frontSource->ClearResult ();
62- if (Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && FetchingSources .size () && frontSource->GetResultRecordsCount ()) {
63- FinishedSources. emplace (frontSource);
64- while (FinishedSources.size () && (*FinishedSources. begin ())-> GetFinish () < FetchingSources. front ()-> GetStart ()) {
65- auto fetchingSource = FetchingSources .front ();
59+ if (Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && SortedSources .size () && frontSource->GetResultRecordsCount ()) {
60+ AFL_VERIFY (FetchingInFlightSources. erase (frontSource) );
61+ AFL_VERIFY (FinishedSources.emplace (frontSource). second );
62+ while (FinishedSources. size () && (*FinishedSources. begin ())-> GetFinish () < SortedSources .front ()-> GetStart ()) {
6663 auto finishedSource = *FinishedSources.begin ();
64+ if (!finishedSource->GetResultRecordsCount () && Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () &&
65+ InFlightLimit < MaxInFlight) {
66+ InFlightLimit = 2 * InFlightLimit;
67+ }
6768 FetchedCount += finishedSource->GetResultRecordsCount ();
6869 FinishedSources.erase (FinishedSources.begin ());
70+ --IntervalsInFlightCount;
6971 AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " source_finished" )(" source_id" , finishedSource->GetSourceId ())(
7072 " source_idx" , finishedSource->GetSourceIdx ())(" limit" , Context->GetCommonContext ()->GetReadMetadata ()->GetLimitRobust ())(
7173 " fetched" , finishedSource->GetResultRecordsCount ());
@@ -119,14 +121,38 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
119121 if (!Context->IsActive ()) {
120122 return false ;
121123 }
124+ if (InFlightLimit <= IntervalsInFlightCount) {
125+ return false ;
126+ }
122127 bool changed = false ;
123- while (SortedSources.size () && FetchingSources.size () < InFlightLimit) {
128+ ui32 inFlightCountLocal = 0 ;
129+ if (SortedSources.size ()) {
130+ for (auto it = FetchingInFlightSources.begin (); it != FetchingInFlightSources.end (); ++it) {
131+ if ((*it)->GetFinish () < SortedSources.front ()->GetStart ()) {
132+ ++inFlightCountLocal;
133+ }
134+ }
135+ }
136+ AFL_VERIFY (IntervalsInFlightCount == inFlightCountLocal)(" count_global" , IntervalsInFlightCount)(" count_local" , inFlightCountLocal);
137+ while (SortedSources.size () && inFlightCountLocal < InFlightLimit) {
124138 SortedSources.front ()->StartProcessing (SortedSources.front ());
125139 FetchingSources.emplace_back (SortedSources.front ());
126140 FetchingSourcesByIdx.emplace (SortedSources.front ()->GetSourceIdx (), SortedSources.front ());
141+ AFL_VERIFY (FetchingInFlightSources.emplace (SortedSources.front ()).second );
127142 SortedSources.pop_front ();
143+ if (SortedSources.size ()) {
144+ ui32 inFlightCountLocalNew = 0 ;
145+ for (auto it = FetchingInFlightSources.begin (); it != FetchingInFlightSources.end (); ++it) {
146+ if ((*it)->GetFinish () < SortedSources.front ()->GetStart ()) {
147+ ++inFlightCountLocalNew;
148+ }
149+ }
150+ AFL_VERIFY (inFlightCountLocal <= inFlightCountLocalNew);
151+ inFlightCountLocal = inFlightCountLocalNew;
152+ }
128153 changed = true ;
129154 }
155+ IntervalsInFlightCount = inFlightCountLocal;
130156 return changed;
131157}
132158
0 commit comments