File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
ydb/library/yql/dq/runtime Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -433,6 +433,10 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
433433 return CurrBlockIndex_ >= BlockLen_;
434434 }
435435
436+ bool IsFinished () const {
437+ return IsFinished_;
438+ }
439+
436440 void NextRow () {
437441 Y_DEBUG_ABORT_UNLESS (!IsEmpty ());
438442 ++CurrBlockIndex_;
@@ -645,7 +649,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
645649 input.NextRow ();
646650 InputRows_.pop_back ();
647651 if (input.IsEmpty ()) {
648- auto status = input. FetchNext ( );
652+ auto status = FetchInput (inputIndex );
649653 if (status == NUdf::EFetchStatus::Yield) {
650654 StartInputIndex_ = inputIndex;
651655 return status;
@@ -657,6 +661,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
657661 }
658662
659663 if (!OutputBlockLen_) {
664+ YQL_ENSURE (AllOf (InputData_, [](const TDqInputBatch& input) { return input.IsEmpty () && input.IsFinished (); }));
660665 return NUdf::EFetchStatus::Finish;
661666 }
662667
You can’t perform that action at this time.
0 commit comments