Skip to content

Commit 44088a1

Browse files
committed
[yt provider] Don't run operation input if sections have nodes to calculate (ydb-platform#11947)
1 parent 1530903 commit 44088a1

File tree

1 file changed

+21
-2
lines changed

1 file changed

+21
-2
lines changed

ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
9393
TYtMerge::CallableName(),
9494
TYtMapReduce::CallableName(),
9595
},
96-
RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}),
96+
RequireForTransientOp(),
9797
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
9898
);
9999
AddHandler(
@@ -104,7 +104,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
104104
RequireFirst(),
105105
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
106106
);
107-
AddHandler({TYtReduce::CallableName()}, RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
107+
AddHandler({TYtReduce::CallableName()}, RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
108108
AddHandler({TYtOutput::CallableName()}, RequireFirst(), Pass());
109109
AddHandler({TYtPublish::CallableName()}, RequireAllOf({TYtPublish::idx_World, TYtPublish::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandlePublish));
110110
AddHandler({TYtDropTable::CallableName()}, RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleDrop));
@@ -123,6 +123,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
123123
TExecTransformerBase::Rewind();
124124
}
125125

126+
static TExecTransformerBase::TPrerequisite RequireForTransientOp() {
127+
return [] (const TExprNode::TPtr& input) {
128+
auto status = RequireChild(*input, TYtTransientOpBase::idx_World);
129+
// We have to run input only if it has no settings to calculate.
130+
// Otherwise, we first of all wait world completion.
131+
// Then begins node execution, which run settings calculation.
132+
// And after that, starts input execution
133+
// See YQL-19303
134+
if (!HasNodesToCalculate(input->ChildPtr(TYtTransientOpBase::idx_Input))) {
135+
status = status.Combine(RequireChild(*input, TYtTransientOpBase::idx_Input));
136+
}
137+
return status;
138+
};
139+
}
140+
126141
private:
127142
static void PushHybridStats(const TYtState::TPtr& state, TStringBuf statName, TStringBuf opName, const TStringBuf& folderName = "") {
128143
with_lock(state->StatisticsMutex) {
@@ -189,6 +204,10 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
189204
return CalculateNodes(State_, input, cluster, needCalc, ctx);
190205
}
191206

207+
if (auto opInput = op.Maybe<TYtTransientOpBase>().Input()) {
208+
YQL_ENSURE(opInput.Ref().GetState() == TExprNode::EState::ExecutionComplete);
209+
}
210+
192211
auto outSection = op.Output();
193212

194213
size_t outWithoutName = 0;

0 commit comments

Comments
 (0)