Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build/conf/compilers/gnu_compiler.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ when ($MSAN_TRACK_ORIGIN == "yes") {

when ($ARCH_XTENSA == "yes") {
FSTACK=
CFLAGS+=-Wno-c++14-extensions
when ($ARCH_XTENSA_HIFI4 == "yes") {
CFLAGS+=-Wno-c++1z-extensions
}
otherwise {
CFLAGS+=-Wno-c++17-extensions
}
}

when ($OS_EMSCRIPTEN == "yes") {
Expand Down
2 changes: 1 addition & 1 deletion ydb/ci/rightlib.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
23e9865bb938b83e7e32b670ba055c407f75494b
796e6186c6652f49958e68c7eb0f06c52827e702
3 changes: 2 additions & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
#include <ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>

#include <library/cpp/cache/cache.h>
Expand Down Expand Up @@ -1818,7 +1819,7 @@ class TKqpHost : public IKqpHost {
}

TString sessionId = CreateGuidAsString();
auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx, NDq::MakeCBOOptimizerFactory());
auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx, NDq::MakeCBOOptimizerFactory(), MakeDqHelper());

ytState->PassiveExecution = true;
ytState->Gateway->OpenSession(
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ PEERDIR(
yql/essentials/core
yql/essentials/providers/common/codec
ydb/library/yql/dq/opt
ydb/library/yql/providers/dq/helper
ydb/library/yql/providers/common/http_gateway
yql/essentials/providers/common/udf_resolve
yql/essentials/providers/config
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK
auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName);
YQL_ENSURE(dataSource);
if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
auto newRead = dqIntegration->WrapRead(NYql::TDqSettings(), input.Cast().Ptr(), ctx);
auto newRead = dqIntegration->WrapRead(input.Cast().Ptr(), ctx, {});
if (newRead.Get() != input.Raw()) {
return newRead;
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
// We prepare a lot of partitions and distribute them between these tasks
// Constraint of 1 task per partition is NOT valid anymore
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
dqIntegration->Partition(NYql::TDqSettings(), maxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false);
IDqIntegration::TPartitionSettings pSettings;
pSettings.MaxPartitions = maxTasksPerStage;
pSettings.CanFallback = false;
dqIntegration->Partition(source.Ref(), partitionParams, &clusterName, ctx, pSettings);
externalSource.SetTaskParamKey(TString(dataSourceCategory));
for (const TString& partitionParam : partitionParams) {
externalSource.AddPartitionedTaskParams(partitionParam);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& c
return node;
}

IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) {
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings) {
TOptimizeExprSettings settings{&typesCtx};
auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) {
if (auto maybeRead = TMaybeNode<TCoRight>(node).Input()) {
Expand All @@ -345,7 +345,7 @@ IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPt
auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName);
YQL_ENSURE(dataSource);
if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
auto newRead = dqIntegration->WrapRead(config, maybeRead.Cast().Ptr(), ctx);
auto newRead = dqIntegration->WrapRead(maybeRead.Cast().Ptr(), ctx, wrSettings);
if (newRead.Get() != maybeRead.Raw()) {
return newRead;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_log.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
#include <yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h>
Expand All @@ -11,7 +12,6 @@
namespace NYql {
class IOptimizationContext;
struct TTypeAnnotationContext;
struct TDqSettings;
struct IProviderContext;
struct TRelOptimizerNode;
struct TOptimizerStatistics;
Expand All @@ -38,7 +38,7 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx);

NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);

IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings);

NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return Nothing();
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& ) override {
if (const auto maybeClReadTable = TMaybeNode<TClReadTable>(read)) {
const auto clReadTable = maybeClReadTable.Cast();
const auto token = TString("cluster:default_") += clReadTable.DataSource().Cluster().StringValue();
Expand Down Expand Up @@ -66,7 +66,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return read;
}

ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
partitions.clear();
NCH::TRange range;
// range.SetRange("limit 42 offset 42 order by ...."); // Possible set range like this.
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/yql/providers/dq/helper/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
LIBRARY()

PEERDIR(
yql/essentials/core/dq_integration
ydb/library/yql/dq/expr_nodes
ydb/library/yql/dq/opt
ydb/library/yql/dq/type_ann
)

SRCS(
yql_dq_helper_impl.cpp
)

END()
93 changes: 93 additions & 0 deletions ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include "yql_dq_helper_impl.h"

#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>


namespace NYql {

using namespace NNodes;

class TDqHelperImpl: public IDqHelper {
public:
bool IsSingleConsumerConnection(const TExprNode::TPtr& node, const TParentsMap& parentsMap) final {
YQL_ENSURE(TDqCnUnionAll::Match(node.Get()));
return NDq::IsSingleConsumerConnection(TDqCnUnionAll(node), parentsMap);
}

TExprNode::TPtr PushLambdaAndCreateCnResult(const TExprNode::TPtr& conn, const TExprNode::TPtr& lambda, TPositionHandle pos,
TExprContext& ctx, IOptimizationContext& optCtx) final
{
YQL_ENSURE(TDqCnUnionAll::Match(conn.Get()));
YQL_ENSURE(lambda->IsLambda());

auto dqUnion = TDqCnUnionAll(conn);
TMaybeNode<TDqConnection> result;
if (NDq::GetStageOutputsCount(dqUnion.Output().Stage()) > 1) {
result = Build<TDqCnUnionAll>(ctx, pos)
.Output()
.Stage<TDqStage>()
.Inputs()
.Add(dqUnion)
.Build()
.Program(lambda)
.Settings(NDq::TDqStageSettings().BuildNode(ctx, pos))
.Build()
.Index().Build("0")
.Build()
.Done().Ptr();
} else {
result = NDq::DqPushLambdaToStageUnionAll(dqUnion, TCoLambda(lambda), {}, ctx, optCtx);
if (!result) {
return {};
}
}

return Build<TDqCnResult>(ctx, pos)
.Output()
.Stage<TDqStage>()
.Inputs()
.Add(result.Cast())
.Build()
.Program()
.Args({"row"})
.Body("row")
.Build()
.Settings(NDq::TDqStageSettings().BuildNode(ctx, pos))
.Build()
.Index().Build("0")
.Build()
.ColumnHints() // TODO: set column hints
.Build()
.Done().Ptr();
}

TExprNode::TPtr CreateDqStageSettings(bool singleTask, TExprContext& ctx, TPositionHandle pos) final {
NDq::TDqStageSettings settings;
settings.PartitionMode = singleTask ? NDq::TDqStageSettings::EPartitionMode::Single : NDq::TDqStageSettings::EPartitionMode::Default;
return settings.BuildNode(ctx, pos).Ptr();
}

TExprNode::TListType RemoveVariadicDqStageSettings(const TExprNode& settings) final {
TExprNode::TListType res;

for (auto n: settings.Children()) {
if (n->Type() == TExprNode::List
&& n->ChildrenSize() > 0
&& n->Child(0)->Content() == NDq::TDqStageSettings::LogicalIdSettingName) {
continue;
}
res.push_back(n);
}
return res;
}
};


IDqHelper::TPtr MakeDqHelper() {
return std::make_shared<TDqHelperImpl>();
}

} // namespace NYql
12 changes: 12 additions & 0 deletions ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <util/generic/ptr.h>
#include <util/generic/map.h>

#include <yql/essentials/core/dq_integration/yql_dq_helper.h>

namespace NYql {

IDqHelper::TPtr MakeDqHelper();

} // namespace NYql
10 changes: 8 additions & 2 deletions ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,13 @@ namespace NYql::NDqs {
TVector<TString> parts;
if (auto dqIntegration = (*datasource)->GetDqIntegration()) {
TString clusterName;
_MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*Settings, maxPartitions, *read, parts, &clusterName, ExprContext, canFallback));
IDqIntegration::TPartitionSettings settings {
.DataSizePerJob = Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
.MaxPartitions = maxPartitions,
.EnableComputeActor = Settings->EnableComputeActor.Get(),
.CanFallback = canFallback
};
_MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*read, parts, &clusterName, ExprContext, settings));
TMaybe<::google::protobuf::Any> sourceSettings;
TString sourceType;
if (dqSource) {
Expand Down Expand Up @@ -585,7 +591,7 @@ namespace NYql::NDqs {
YQL_ENSURE(dataSource);
auto dqIntegration = (*dataSource)->GetDqIntegration();
YQL_ENSURE(dqIntegration);

google::protobuf::Any providerSpecificLookupSourceSettings;
TString sourceType;
dqIntegration->FillLookupSourceSettings(*rightInput.Raw(), providerSpecificLookupSourceSettings, sourceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase {

State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default);

IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, *State_->Settings);
IDqIntegration::TWrapReadSettings wrSettings {
.WatermarksMode = State_->Settings->WatermarksMode.Get(),
.WatermarksGranularityMs = State_->Settings->WatermarksGranularityMs.Get(),
.WatermarksLateArrivalDelayMs = State_->Settings->WatermarksLateArrivalDelayMs.Get(),
.WatermarksEnableIdlePartitions = State_->Settings->WatermarksEnableIdlePartitions.Get()
};
IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, wrSettings);
if (input != output) {
YQL_CLOG(INFO, ProviderDq) << "DqsRecapture";
// TODO: Add before/after recapture transformers
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ RECURSE(
counters
expr_nodes
global_worker_manager
helper
interface
metrics
mkql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
UNIT_ASSERT(genericDataSource != Types->DataSourceMap.end());
auto dqIntegration = genericDataSource->second->GetDqIntegration();
UNIT_ASSERT(dqIntegration);
auto newRead = dqIntegration->WrapRead(TDqSettings(), input.Ptr(), ctx);
auto newRead = dqIntegration->WrapRead(input.Ptr(), ctx, IDqIntegration::TWrapReadSettings{});
BuildSettings(newRead, dqIntegration, ctx);
return newRead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace NYql {
return Nothing();
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override {
if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) {
const auto genReadTable = maybeGenReadTable.Cast();
YQL_ENSURE(genReadTable.Ref().GetTypeAnn(), "No type annotation for node " << genReadTable.Ref().Content());
Expand Down Expand Up @@ -106,8 +106,7 @@ namespace NYql {
return read;
}

ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&,
bool) override {
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
partitions.clear();
Generic::TRange range;
partitions.emplace_back();
Expand Down
Loading
Loading