Skip to content

Commit 155303f

Browse files
Merge c069549 into 61e223d
2 parents 61e223d + c069549 commit 155303f

File tree

70 files changed

+1780
-261
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1780
-261
lines changed

build/conf/compilers/gnu_compiler.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ when ($MSAN_TRACK_ORIGIN == "yes") {
8484

8585
when ($ARCH_XTENSA == "yes") {
8686
FSTACK=
87+
CFLAGS+=-Wno-c++14-extensions
88+
when ($ARCH_XTENSA_HIFI4 == "yes") {
89+
CFLAGS+=-Wno-c++1z-extensions
90+
}
91+
otherwise {
92+
CFLAGS+=-Wno-c++17-extensions
93+
}
8794
}
8895

8996
when ($OS_EMSCRIPTEN == "yes") {

ydb/ci/rightlib.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
23e9865bb938b83e7e32b670ba055c407f75494b
1+
796e6186c6652f49958e68c7eb0f06c52827e702

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
2727
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
2828
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
29+
#include <ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h>
2930
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
3031

3132
#include <library/cpp/cache/cache.h>
@@ -1818,7 +1819,7 @@ class TKqpHost : public IKqpHost {
18181819
}
18191820

18201821
TString sessionId = CreateGuidAsString();
1821-
auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx, NDq::MakeCBOOptimizerFactory());
1822+
auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx, NDq::MakeCBOOptimizerFactory(), MakeDqHelper());
18221823

18231824
ytState->PassiveExecution = true;
18241825
ytState->Gateway->OpenSession(

ydb/core/kqp/host/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ PEERDIR(
2525
yql/essentials/core
2626
yql/essentials/providers/common/codec
2727
ydb/library/yql/dq/opt
28+
ydb/library/yql/providers/dq/helper
2829
ydb/library/yql/providers/common/http_gateway
2930
yql/essentials/providers/common/udf_resolve
3031
yql/essentials/providers/config

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK
10171017
auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName);
10181018
YQL_ENSURE(dataSource);
10191019
if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
1020-
auto newRead = dqIntegration->WrapRead(NYql::TDqSettings(), input.Cast().Ptr(), ctx);
1020+
auto newRead = dqIntegration->WrapRead(input.Cast().Ptr(), ctx, {});
10211021
if (newRead.Get() != input.Raw()) {
10221022
return newRead;
10231023
}

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10921092
// We prepare a lot of partitions and distribute them between these tasks
10931093
// Constraint of 1 task per partition is NOT valid anymore
10941094
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
1095-
dqIntegration->Partition(NYql::TDqSettings(), maxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false);
1095+
IDqIntegration::TPartitionSettings pSettings;
1096+
pSettings.MaxPartitions = maxTasksPerStage;
1097+
pSettings.CanFallback = false;
1098+
dqIntegration->Partition(source.Ref(), partitionParams, &clusterName, ctx, pSettings);
10961099
externalSource.SetTaskParamKey(TString(dataSourceCategory));
10971100
for (const TString& partitionParam : partitionParams) {
10981101
externalSource.AddPartitionedTaskParams(partitionParam);

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& c
336336
return node;
337337
}
338338

339-
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) {
339+
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings) {
340340
TOptimizeExprSettings settings{&typesCtx};
341341
auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) {
342342
if (auto maybeRead = TMaybeNode<TCoRight>(node).Input()) {
@@ -345,7 +345,7 @@ IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPt
345345
auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName);
346346
YQL_ENSURE(dataSource);
347347
if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
348-
auto newRead = dqIntegration->WrapRead(config, maybeRead.Cast().Ptr(), ctx);
348+
auto newRead = dqIntegration->WrapRead(maybeRead.Cast().Ptr(), ctx, wrSettings);
349349
if (newRead.Get() != maybeRead.Raw()) {
350350
return newRead;
351351
}

ydb/library/yql/dq/opt/dq_opt_log.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
34
#include <yql/essentials/ast/yql_expr.h>
45
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
56
#include <yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h>
@@ -11,7 +12,6 @@
1112
namespace NYql {
1213
class IOptimizationContext;
1314
struct TTypeAnnotationContext;
14-
struct TDqSettings;
1515
struct IProviderContext;
1616
struct TRelOptimizerNode;
1717
struct TOptimizerStatistics;
@@ -38,7 +38,7 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx);
3838

3939
NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);
4040

41-
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
41+
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings);
4242

4343
NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);
4444

ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
3333
return Nothing();
3434
}
3535

36-
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
36+
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& ) override {
3737
if (const auto maybeClReadTable = TMaybeNode<TClReadTable>(read)) {
3838
const auto clReadTable = maybeClReadTable.Cast();
3939
const auto token = TString("cluster:default_") += clReadTable.DataSource().Cluster().StringValue();
@@ -66,7 +66,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
6666
return read;
6767
}
6868

69-
ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
69+
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
7070
partitions.clear();
7171
NCH::TRange range;
7272
// range.SetRange("limit 42 offset 42 order by ...."); // Possible set range like this.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
yql/essentials/core/dq_integration
5+
ydb/library/yql/dq/expr_nodes
6+
ydb/library/yql/dq/opt
7+
ydb/library/yql/dq/type_ann
8+
)
9+
10+
SRCS(
11+
yql_dq_helper_impl.cpp
12+
)
13+
14+
END()
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#include "yql_dq_helper_impl.h"
2+
3+
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
4+
#include <ydb/library/yql/dq/opt/dq_opt.h>
5+
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
6+
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
7+
8+
9+
namespace NYql {
10+
11+
using namespace NNodes;
12+
13+
class TDqHelperImpl: public IDqHelper {
14+
public:
15+
bool IsSingleConsumerConnection(const TExprNode::TPtr& node, const TParentsMap& parentsMap) final {
16+
YQL_ENSURE(TDqCnUnionAll::Match(node.Get()));
17+
return NDq::IsSingleConsumerConnection(TDqCnUnionAll(node), parentsMap);
18+
}
19+
20+
TExprNode::TPtr PushLambdaAndCreateCnResult(const TExprNode::TPtr& conn, const TExprNode::TPtr& lambda, TPositionHandle pos,
21+
TExprContext& ctx, IOptimizationContext& optCtx) final
22+
{
23+
YQL_ENSURE(TDqCnUnionAll::Match(conn.Get()));
24+
YQL_ENSURE(lambda->IsLambda());
25+
26+
auto dqUnion = TDqCnUnionAll(conn);
27+
TMaybeNode<TDqConnection> result;
28+
if (NDq::GetStageOutputsCount(dqUnion.Output().Stage()) > 1) {
29+
result = Build<TDqCnUnionAll>(ctx, pos)
30+
.Output()
31+
.Stage<TDqStage>()
32+
.Inputs()
33+
.Add(dqUnion)
34+
.Build()
35+
.Program(lambda)
36+
.Settings(NDq::TDqStageSettings().BuildNode(ctx, pos))
37+
.Build()
38+
.Index().Build("0")
39+
.Build()
40+
.Done().Ptr();
41+
} else {
42+
result = NDq::DqPushLambdaToStageUnionAll(dqUnion, TCoLambda(lambda), {}, ctx, optCtx);
43+
if (!result) {
44+
return {};
45+
}
46+
}
47+
48+
return Build<TDqCnResult>(ctx, pos)
49+
.Output()
50+
.Stage<TDqStage>()
51+
.Inputs()
52+
.Add(result.Cast())
53+
.Build()
54+
.Program()
55+
.Args({"row"})
56+
.Body("row")
57+
.Build()
58+
.Settings(NDq::TDqStageSettings().BuildNode(ctx, pos))
59+
.Build()
60+
.Index().Build("0")
61+
.Build()
62+
.ColumnHints() // TODO: set column hints
63+
.Build()
64+
.Done().Ptr();
65+
}
66+
67+
TExprNode::TPtr CreateDqStageSettings(bool singleTask, TExprContext& ctx, TPositionHandle pos) final {
68+
NDq::TDqStageSettings settings;
69+
settings.PartitionMode = singleTask ? NDq::TDqStageSettings::EPartitionMode::Single : NDq::TDqStageSettings::EPartitionMode::Default;
70+
return settings.BuildNode(ctx, pos).Ptr();
71+
}
72+
73+
TExprNode::TListType RemoveVariadicDqStageSettings(const TExprNode& settings) final {
74+
TExprNode::TListType res;
75+
76+
for (auto n: settings.Children()) {
77+
if (n->Type() == TExprNode::List
78+
&& n->ChildrenSize() > 0
79+
&& n->Child(0)->Content() == NDq::TDqStageSettings::LogicalIdSettingName) {
80+
continue;
81+
}
82+
res.push_back(n);
83+
}
84+
return res;
85+
}
86+
};
87+
88+
89+
IDqHelper::TPtr MakeDqHelper() {
90+
return std::make_shared<TDqHelperImpl>();
91+
}
92+
93+
} // namespace NYql
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include <util/generic/ptr.h>
4+
#include <util/generic/map.h>
5+
6+
#include <yql/essentials/core/dq_integration/yql_dq_helper.h>
7+
8+
namespace NYql {
9+
10+
IDqHelper::TPtr MakeDqHelper();
11+
12+
} // namespace NYql

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,13 @@ namespace NYql::NDqs {
544544
TVector<TString> parts;
545545
if (auto dqIntegration = (*datasource)->GetDqIntegration()) {
546546
TString clusterName;
547-
_MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*Settings, maxPartitions, *read, parts, &clusterName, ExprContext, canFallback));
547+
IDqIntegration::TPartitionSettings settings {
548+
.DataSizePerJob = Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
549+
.MaxPartitions = maxPartitions,
550+
.EnableComputeActor = Settings->EnableComputeActor.Get(),
551+
.CanFallback = canFallback
552+
};
553+
_MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*read, parts, &clusterName, ExprContext, settings));
548554
TMaybe<::google::protobuf::Any> sourceSettings;
549555
TString sourceType;
550556
if (dqSource) {
@@ -585,7 +591,7 @@ namespace NYql::NDqs {
585591
YQL_ENSURE(dataSource);
586592
auto dqIntegration = (*dataSource)->GetDqIntegration();
587593
YQL_ENSURE(dqIntegration);
588-
594+
589595
google::protobuf::Any providerSpecificLookupSourceSettings;
590596
TString sourceType;
591597
dqIntegration->FillLookupSourceSettings(*rightInput.Raw(), providerSpecificLookupSourceSettings, sourceType);

ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,13 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase {
9797

9898
State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default);
9999

100-
IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, *State_->Settings);
100+
IDqIntegration::TWrapReadSettings wrSettings {
101+
.WatermarksMode = State_->Settings->WatermarksMode.Get(),
102+
.WatermarksGranularityMs = State_->Settings->WatermarksGranularityMs.Get(),
103+
.WatermarksLateArrivalDelayMs = State_->Settings->WatermarksLateArrivalDelayMs.Get(),
104+
.WatermarksEnableIdlePartitions = State_->Settings->WatermarksEnableIdlePartitions.Get()
105+
};
106+
IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, wrSettings);
101107
if (input != output) {
102108
YQL_CLOG(INFO, ProviderDq) << "DqsRecapture";
103109
// TODO: Add before/after recapture transformers

ydb/library/yql/providers/dq/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ RECURSE(
44
counters
55
expr_nodes
66
global_worker_manager
7+
helper
78
interface
89
metrics
910
mkql

ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
162162
UNIT_ASSERT(genericDataSource != Types->DataSourceMap.end());
163163
auto dqIntegration = genericDataSource->second->GetDqIntegration();
164164
UNIT_ASSERT(dqIntegration);
165-
auto newRead = dqIntegration->WrapRead(TDqSettings(), input.Ptr(), ctx);
165+
auto newRead = dqIntegration->WrapRead(input.Ptr(), ctx, IDqIntegration::TWrapReadSettings{});
166166
BuildSettings(newRead, dqIntegration, ctx);
167167
return newRead;
168168
}

ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ namespace NYql {
6363
return Nothing();
6464
}
6565

66-
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
66+
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override {
6767
if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) {
6868
const auto genReadTable = maybeGenReadTable.Cast();
6969
YQL_ENSURE(genReadTable.Ref().GetTypeAnn(), "No type annotation for node " << genReadTable.Ref().Content());
@@ -106,8 +106,7 @@ namespace NYql {
106106
return read;
107107
}
108108

109-
ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&,
110-
bool) override {
109+
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
111110
partitions.clear();
112111
Generic::TRange range;
113112
partitions.emplace_back();

0 commit comments

Comments
 (0)