Skip to content

Commit 443f78a

Browse files
committed
Don't run operation input if sections have nodes to calculate
1 parent a458b35 commit 443f78a

File tree

1 file changed

+21
-2
lines changed

1 file changed

+21
-2
lines changed

ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
9494
TYtMerge::CallableName(),
9595
TYtMapReduce::CallableName(),
9696
},
97-
RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}),
97+
RequireForTransientOp(),
9898
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
9999
);
100100
AddHandler(
@@ -105,7 +105,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
105105
RequireFirst(),
106106
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
107107
);
108-
AddHandler({TYtReduce::CallableName()}, RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
108+
AddHandler({TYtReduce::CallableName()}, RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
109109
AddHandler({TYtOutput::CallableName()}, RequireFirst(), Pass());
110110
AddHandler({TYtPublish::CallableName()}, RequireAllOf({TYtPublish::idx_World, TYtPublish::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandlePublish));
111111
AddHandler({TYtDropTable::CallableName()}, RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleDrop));
@@ -124,6 +124,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
124124
TExecTransformerBase::Rewind();
125125
}
126126

127+
static TExecTransformerBase::TPrerequisite RequireForTransientOp() {
128+
return [] (const TExprNode::TPtr& input) {
129+
auto status = RequireChild(*input, TYtTransientOpBase::idx_World);
130+
// We have to run input only if it has no settings to calculate.
131+
// Otherwise, we first of all wait world completion.
132+
// Then begins node execution, which run settings calculation.
133+
// And after that, starts input execution
134+
// See YQL-19303
135+
if (!HasNodesToCalculate(input->ChildPtr(TYtTransientOpBase::idx_Input))) {
136+
status = status.Combine(RequireChild(*input, TYtTransientOpBase::idx_Input));
137+
}
138+
return status;
139+
};
140+
}
141+
127142
private:
128143
static void PushHybridStats(const TYtState::TPtr& state, TStringBuf statName, TStringBuf opName, const TStringBuf& folderName = "") {
129144
with_lock(state->StatisticsMutex) {
@@ -190,6 +205,10 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
190205
return CalculateNodes(State_, input, cluster, needCalc, ctx);
191206
}
192207

208+
if (auto opInput = op.Maybe<TYtTransientOpBase>().Input()) {
209+
YQL_ENSURE(opInput.Ref().GetState() == TExprNode::EState::ExecutionComplete);
210+
}
211+
193212
auto outSection = op.Output();
194213

195214
size_t outWithoutName = 0;

0 commit comments

Comments
 (0)