@@ -18,7 +18,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
1818 Context->GetCommonContext ()->GetCounters ().OnSourceFinished (
1919 source->GetRecordsCount (), source->GetUsedRawBytes (), tableExt ? tableExt->num_rows () : 0 );
2020
21- if ((!tableExt || !tableExt->num_rows ()) && Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && InFlightLimit < MaxInFlight) {
21+ if (/* (!tableExt || !tableExt->num_rows()) &&*/ Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && InFlightLimit < MaxInFlight) {
2222 InFlightLimit = 2 * InFlightLimit;
2323 }
2424 source->MutableStageResult ().SetResultChunk (std::move (tableExt), startIndex, recordsCount);
@@ -59,10 +59,9 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
5959 AFL_VERIFY (FetchingSourcesByIdx.erase (frontSource->GetSourceIdx ()));
6060 FetchingSources.pop_front ();
6161 frontSource->ClearResult ();
62- if (Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && FetchingSources .size () && frontSource->GetResultRecordsCount ()) {
62+ if (Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && SortedSources .size () && frontSource->GetResultRecordsCount ()) {
6363 FinishedSources.emplace (frontSource);
64- while (FinishedSources.size () && (*FinishedSources.begin ())->GetFinish () < FetchingSources.front ()->GetStart ()) {
65- auto fetchingSource = FetchingSources.front ();
64+ while (FinishedSources.size () && (*FinishedSources.begin ())->GetFinish () < SortedSources.front ()->GetStart ()) {
6665 auto finishedSource = *FinishedSources.begin ();
6766 FetchedCount += finishedSource->GetResultRecordsCount ();
6867 FinishedSources.erase (FinishedSources.begin ());
0 commit comments