Skip to content

Commit 10720d0

Browse files
fixed bug in S3 Physical Optimizer 2 (#13391)
1 parent 7091202 commit 10720d0

File tree

2 files changed

+53
-11
lines changed

2 files changed

+53
-11
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2366,6 +2366,21 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
23662366

23672367
auto db = kikimr->GetQueryClient();
23682368

2369+
{
2370+
const TString sql = fmt::format(R"(
2371+
INSERT INTO {destination}
2372+
SELECT key, value FROM {source};)",
2373+
"destination"_a = table1,
2374+
"source"_a = olapTable);
2375+
2376+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2377+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2378+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
2379+
2380+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2381+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
2382+
}
2383+
23692384
{
23702385
const TString sql = fmt::format(R"(
23712386
INSERT INTO {destination}
@@ -2381,6 +2396,21 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
23812396
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
23822397
}
23832398

2399+
{
2400+
const TString sql = fmt::format(R"(
2401+
INSERT INTO {destination}
2402+
SELECT key, value, "2024" AS year FROM {source};)",
2403+
"destination"_a = table2,
2404+
"source"_a = olapTable);
2405+
2406+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2407+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2408+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
2409+
2410+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2411+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
2412+
}
2413+
23842414
{
23852415
const TString sql = fmt::format(R"(
23862416
INSERT INTO {destination}

ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -199,17 +199,31 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
199199
}
200200
}
201201

202-
if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TDqCnUnionAll::CallableName()); })) {
202+
if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
203203
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink.";
204+
auto shouldBePassedAsInput = FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TDqStage::CallableName()); });
205+
206+
auto stageInputs = Build<TExprList>(ctx, writePos);
207+
auto toFlow = Build<TCoToFlow>(ctx, writePos);
208+
TVector<TCoArgument> args;
209+
210+
if (shouldBePassedAsInput) {
211+
auto arg = Build<TCoArgument>(ctx, writePos).Name("in").Done();
212+
stageInputs.Add(input);
213+
args.push_back(arg);
214+
toFlow.Input(arg);
215+
}
216+
else {
217+
toFlow.Input(input);
218+
}
219+
204220
return keys.empty() ?
205221
Build<TDqStage>(ctx, writePos)
206-
.Inputs().Build()
222+
.Inputs(stageInputs.Done())
207223
.Program<TCoLambda>()
208-
.Args({})
224+
.Args(args)
209225
.Body<TS3SinkOutput>()
210-
.Input<TCoToFlow>()
211-
.Input(input)
212-
.Build()
226+
.Input(toFlow.Done())
213227
.Format(target.Format())
214228
.KeyColumns().Build()
215229
.Settings(sinkOutputSettingsBuilder.Done())
@@ -237,12 +251,10 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
237251
.Add<TDqCnHashShuffle>()
238252
.Output<TDqOutput>()
239253
.Stage<TDqStage>()
240-
.Inputs().Build()
254+
.Inputs(stageInputs.Done())
241255
.Program<TCoLambda>()
242-
.Args({})
243-
.Body<TCoToFlow>()
244-
.Input(input)
245-
.Build()
256+
.Args(args)
257+
.Body(toFlow.Done())
246258
.Build()
247259
.Settings().Build()
248260
.Build()

0 commit comments

Comments
 (0)