@@ -12,6 +12,7 @@ class ISourcesCollection {
1212 virtual std::shared_ptr<IDataSource> DoExtractNext () = 0;
1313 virtual bool DoCheckInFlightLimits () const = 0;
1414 virtual void DoOnSourceFinished (const std::shared_ptr<IDataSource>& source) = 0;
15+ virtual void DoOnIntervalResult (const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) = 0;
1516 virtual void DoClear () = 0;
1617
1718 TPositiveControlInteger SourcesInFlightCount;
@@ -30,6 +31,10 @@ class ISourcesCollection {
3031 return DoBuildCursor (source, readyRecords);
3132 }
3233
34+ void OnIntervalResult (const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
35+ return DoOnIntervalResult (table, source);
36+ }
37+
3338 TString DebugString () const {
3439 return DoDebugString ();
3540 }
@@ -87,6 +92,8 @@ class TNotSortedCollection: public ISourcesCollection {
8792 virtual bool DoCheckInFlightLimits () const override {
8893 return InFlightCount < InFlightLimit;
8994 }
95+ virtual void DoOnIntervalResult (const std::shared_ptr<arrow::Table>& /* table*/ , const std::shared_ptr<IDataSource>& /* source*/ ) override {
96+ }
9097 virtual void DoOnSourceFinished (const std::shared_ptr<IDataSource>& source) override {
9198 if (!source->GetResultRecordsCount () && InFlightLimit * 2 < GetMaxInFlight ()) {
9299 InFlightLimit *= 2 ;
@@ -103,8 +110,7 @@ class TNotSortedCollection: public ISourcesCollection {
103110 TNotSortedCollection (const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
104111 const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit)
105112 : TBase(context)
106- , Limit(limit)
107- {
113+ , Limit(limit) {
108114 if (Limit) {
109115 InFlightLimit = 1 ;
110116 } else {
@@ -141,6 +147,8 @@ class TSortedFullScanCollection: public ISourcesCollection {
141147 virtual std::shared_ptr<IScanCursor> DoBuildCursor (const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
142148 return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch (), source->GetSourceId (), readyRecords);
143149 }
150+ virtual void DoOnIntervalResult (const std::shared_ptr<arrow::Table>& /* table*/ , const std::shared_ptr<IDataSource>& /* source*/ ) override {
151+ }
144152 virtual std::shared_ptr<IDataSource> DoExtractNext () override {
145153 AFL_VERIFY (HeapSources.size ());
146154 auto result = HeapSources.front ().Construct (Context);
@@ -192,6 +200,13 @@ class TScanWithLimitCollection: public ISourcesCollection {
192200 , SourceId(source->GetSourceId ())
193201 , SourceIdx(source->GetSourceIdx ()) {
194202 }
203+
204+ TFinishedDataSource (const std::shared_ptr<IDataSource>& source, const ui32 partSize)
205+ : RecordsCount(partSize)
206+ , SourceId(source->GetSourceId ())
207+ , SourceIdx(source->GetSourceIdx ()) {
208+ AFL_VERIFY (partSize < source->GetResultRecordsCount ());
209+ }
195210 };
196211
197212 std::deque<TSourceConstructor> HeapSources;
@@ -203,6 +218,7 @@ class TScanWithLimitCollection: public ISourcesCollection {
203218 std::map<TCompareKeyForScanSequence, TFinishedDataSource> FinishedSources;
204219 std::set<TCompareKeyForScanSequence> FetchingInFlightSources;
205220
221+ virtual void DoOnIntervalResult (const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) override ;
206222 virtual std::shared_ptr<IScanCursor> DoBuildCursor (const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
207223 return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch (), source->GetSourceId (), readyRecords);
208224 }
@@ -214,7 +230,8 @@ class TScanWithLimitCollection: public ISourcesCollection {
214230 }
215231 virtual std::shared_ptr<IDataSource> DoExtractNext () override ;
216232 virtual bool DoCheckInFlightLimits () const override {
217- return (FetchingInFlightCount < GetMaxInFlight ()) && (FullIntervalsFetchingCount < InFlightLimit);
233+ return (FetchingInFlightCount < InFlightLimit);
234+ // &&(FullIntervalsFetchingCount < InFlightLimit);
218235 }
219236 virtual void DoOnSourceFinished (const std::shared_ptr<IDataSource>& source) override ;
220237 ui32 GetInFlightIntervalsCount (const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const ;
0 commit comments