Skip to content

YQ-3000 fixed count for YT provider in KQP #4175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
},
{
ToString(NYql::EDatabaseType::YT),
CreateExternalDataSource(TString{NYql::YtProviderName}, {"TOKEN"}, {}, hostnamePatternsRegEx)
CreateExternalDataSource(TString{NYql::YtProviderName}, {"NONE", "TOKEN"}, {}, hostnamePatternsRegEx)
}
});
}
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/kqp_yql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>

namespace NYql {

Expand Down Expand Up @@ -454,4 +456,12 @@ TString PrintKqpStageOnly(const TDqStageBase& stage, TExprContext& ctx) {
return KqpExprToPrettyString(TExprBase(newStage), ctx);
}

TAutoPtr<IGraphTransformer> GetDqIntegrationPeepholeTransformer(bool beforeDqTransforms, TIntrusivePtr<TTypeAnnotationContext> typesCtx) {
TTransformationPipeline dqIntegrationPeepholePipeline(typesCtx);
for (auto* dqIntegration : GetUniqueIntegrations(*typesCtx)) {
dqIntegration->ConfigurePeepholePipeline(beforeDqTransforms, {}, &dqIntegrationPeepholePipeline);
}
return dqIntegrationPeepholePipeline.Build();
}

} // namespace NYql
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_yql.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,8 @@ TString KqpExprToPrettyString(const NNodes::TExprBase& expr, TExprContext& ctx);

TString PrintKqpStageOnly(const NNodes::TDqStageBase& stage, TExprContext& ctx);

class IGraphTransformer;
struct TTypeAnnotationContext;
TAutoPtr<IGraphTransformer> GetDqIntegrationPeepholeTransformer(bool beforeDqTransforms, TIntrusivePtr<TTypeAnnotationContext> typesCtx);

} // namespace NYql
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ PEERDIR(
ydb/library/yql/dq/expr_nodes
ydb/library/aclib
ydb/library/yql/core/issue
ydb/library/yql/core/services
ydb/library/yql/dq/actors
ydb/library/yql/dq/common
ydb/library/yql/dq/integration
ydb/library/yql/parser/pg_wrapper/interface
ydb/public/lib/operation_id
ydb/public/lib/operation_id/protos
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,7 @@ class TKqpHost : public IKqpHost {
.Add(TCollectParametersTransformer::Sync(SessionCtx->QueryPtr()), "CollectParameters")
.AddPostTypeAnnotation()
.AddOptimization(true, false)
.Add(GetDqIntegrationPeepholeTransformer(true, TypesCtx), "DqIntegrationPeephole")
.Add(TLogExprTransformer::Sync("Optimized expr"), "LogExpr")
.AddRun(&NullProgressWriter)
.Build();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ class TKqpRunner : public IKqpRunner {
.Add(Log("PhysicalPeephole"), "LogPhysicalPeephole")
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
.AddPostTypeAnnotation()
.Add(GetDqIntegrationPeepholeTransformer(false, typesCtx), "DqIntegrationPeephole")
.Add(
CreateKqpTxsPeepholeTransformer(
CreateTypeAnnotationTransformer(
Expand Down
23 changes: 15 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct TKiExploreTxResults {
TVector<TExprBase> Sync;
TVector<TKiQueryBlock> QueryBlocks;
bool HasExecute;
bool HasErrors;

THashSet<const TExprNode*> GetSyncSet() const {
THashSet<const TExprNode*> syncSet;
Expand Down Expand Up @@ -280,10 +281,11 @@ struct TKiExploreTxResults {
}

TKiExploreTxResults()
: HasExecute(false) {}
: HasExecute(false)
, HasErrors(false) {}
};

bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize = true) {
bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize, bool* hasErrors = nullptr) {
if (node.Ref().ChildrenSize() <= 1) {
return false;
}
Expand All @@ -294,12 +296,17 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory);
if (dataSourceProviderIt != types.DataSourceMap.end()) {
if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) {
if (dqIntegration->CanRead(*node.Ptr(), ctx) &&
(!estimateReadSize || dqIntegration->EstimateReadSize(
if (!dqIntegration->CanRead(*node.Ptr(), ctx)) {
if (!node.Ref().IsCallable(ConfigureName) && hasErrors) {
*hasErrors = true;
}
return false;
}
if (!estimateReadSize || dqIntegration->EstimateReadSize(
TDqSettings::TDefault::DataSizePerJob,
TDqSettings::TDefault::MaxTasksPerStage,
{node.Raw()},
ctx))) {
ctx)) {
return true;
}
}
Expand Down Expand Up @@ -388,7 +395,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return result;
}

if (IsDqRead(node, ctx, types)) {
if (IsDqRead(node, ctx, types, true, &txRes.HasErrors)) {
txRes.Ops.insert(node.Raw());
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
Expand Down Expand Up @@ -849,8 +856,8 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK

TKiExploreTxResults txExplore;
txExplore.ConcurrentResults = concurrentResults;
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types)) {
return node.Ptr();
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types) || txExplore.HasErrors) {
return txExplore.HasErrors ? nullptr : node.Ptr();
}

if (txExplore.HasExecute) {
Expand Down
14 changes: 2 additions & 12 deletions ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,8 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
return TRuntimeNode();
});

std::unordered_set<TString> usedProviders;
for (const auto& provider : typesCtx.DataSources) {
if (auto* dqIntegration = provider->GetDqIntegration()) {
dqIntegration->RegisterMkqlCompiler(*compiler);
usedProviders.emplace(provider->GetName());
}
}

for (const auto& provider : typesCtx.DataSinks) {
if (auto* dqIntegration = provider->GetDqIntegration(); dqIntegration && !usedProviders.contains(TString(provider->GetName()))) {
dqIntegration->RegisterMkqlCompiler(*compiler);
}
for (auto* dqIntegration : GetUniqueIntegrations(typesCtx)) {
dqIntegration->RegisterMkqlCompiler(*compiler);
}

compiler->AddCallable(TKqpWideReadTable::CallableName(),
Expand Down
23 changes: 23 additions & 0 deletions ydb/library/yql/dq/integration/yql_dq_integration.cpp
Original file line number Diff line number Diff line change
@@ -1 +1,24 @@
#include "yql_dq_integration.h"

#include <ydb/library/yql/core/yql_type_annotation.h>

namespace NYql {

std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx) {
std::unordered_set<IDqIntegration*> uniqueIntegrations(typesCtx.DataSources.size() + typesCtx.DataSinks.size());
for (const auto& provider : typesCtx.DataSources) {
if (auto* dqIntegration = provider->GetDqIntegration()) {
uniqueIntegrations.emplace(dqIntegration);
}
}

for (const auto& provider : typesCtx.DataSinks) {
if (auto* dqIntegration = provider->GetDqIntegration()) {
uniqueIntegrations.emplace(dqIntegration);
}
}

return uniqueIntegrations;
}

} // namespace NYql
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/integration/yql_dq_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ class IDqIntegration {
virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
};

std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx);

} // namespace NYql
12 changes: 1 addition & 11 deletions ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,7 @@ void RegisterDqsMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, cons
return TRuntimeNode();
});

std::unordered_set<IDqIntegration*> integrations(ctx.DataSources.size() + ctx.DataSinks.size());
for (const auto& ds: ctx.DataSources) {
if (const auto dq = ds->GetDqIntegration()) {
integrations.emplace(dq);
}
}
for (const auto& ds: ctx.DataSinks) {
if (const auto dq = ds->GetDqIntegration()) {
integrations.emplace(dq);
}
}
auto integrations = GetUniqueIntegrations(ctx);
std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::RegisterMkqlCompiler, std::placeholders::_1, std::ref(compiler)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,9 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
TDqsPipelineConfigurator(const TDqStatePtr& state, const THashMap<TString, TString>& providerParams)
: State_(state)
, ProviderParams_(providerParams)
{
for (const auto& ds: State_->TypeCtx->DataSources) {
if (const auto dq = ds->GetDqIntegration()) {
UniqIntegrations_.emplace(dq);
}
}
for (const auto& ds: State_->TypeCtx->DataSinks) {
if (const auto dq = ds->GetDqIntegration()) {
UniqIntegrations_.emplace(dq);
}
}
}
, UniqIntegrations_(GetUniqueIntegrations(*State_->TypeCtx))
{}

private:
void AfterCreate(TTransformationPipeline*) const final {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def run(self, test_name: str, script: str, generic_settings: GenericSettings) ->
result_path = artifacts.make_path(test_name=test_name, artifact_name='result.json')

# For debug add option --trace-opt to args
cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full'
cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full-json'

output = None
data_out = None
Expand Down
59 changes: 37 additions & 22 deletions ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
}

if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); canFallback && chunksCount > maxChunks) {
throw TFallbackError() << "DQ cannot execute the query. Cause: table with too many chunks";
throw TFallbackError() << DqFallbackErrorMessageWrap("table with too many chunks");
}

if (hasErasure) {
Expand Down Expand Up @@ -162,7 +162,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
.Config(State_->Configuration->Snapshot())
.Paths(std::move(paths)));
if (!res.Success()) {
const auto message = TStringBuilder() << "DQ cannot execute the query. Cause: failed to partition table";
const auto message = DqFallbackErrorMessageWrap("failed to partition table");
YQL_CLOG(ERROR, ProviderDq) << message;
auto issue = YqlIssue(TPosition(), TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, message);
for (auto& subIssue: res.Issues()) {
Expand Down Expand Up @@ -193,7 +193,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes);
ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob;
if (canFallback && hasErasure && parts > maxTasks) {
std::string_view message = "DQ cannot execute the query. Cause: too big table with erasure codec";
auto message = DqFallbackErrorMessageWrap("too big table with erasure codec");
YQL_CLOG(INFO, ProviderDq) << message;
throw TFallbackError() << message;
}
Expand Down Expand Up @@ -242,13 +242,24 @@ class TYtDqIntegration: public TDqIntegrationBase {
return maxDataSizePerJob;
}

void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues) {
if (!skipIssues) {
void AddMessage(TExprContext& ctx, const TString& message, bool skipIssues, bool riseError) {
if (skipIssues && !riseError) {
return;
}

TIssue issue(DqFallbackErrorMessageWrap(message));
if (riseError) {
YQL_CLOG(ERROR, ProviderDq) << message;
issue.Severity = TSeverityIds::S_ERROR;
} else {
YQL_CLOG(INFO, ProviderDq) << message;
TIssue info("DQ cannot execute the query. Cause: " + message);
info.Severity = TSeverityIds::S_INFO;
ctx.IssueManager.RaiseIssue(info);
issue.Severity = TSeverityIds::S_INFO;
}
ctx.IssueManager.RaiseIssue(issue);
}

void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues) {
AddMessage(ctx, message, skipIssues, false);
}

bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override {
Expand Down Expand Up @@ -293,7 +304,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
} else if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) {
auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues);
AddMessage(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, State_->PassiveExecution);
return false;
}
const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
Expand All @@ -309,45 +320,45 @@ class TYtDqIntegration: public TDqIntegrationBase {
}
}
}
AddInfo(ctx, info, skipIssues);
AddMessage(ctx, info, skipIssues, State_->PassiveExecution);
return false;
}
auto sampleSetting = GetSetting(section.Settings().Ref(), EYtSettingType::Sample);
if (sampleSetting && sampleSetting->Child(1)->Child(0)->Content() == "system") {
AddInfo(ctx, "system sampling", skipIssues);
AddMessage(ctx, "system sampling", skipIssues, State_->PassiveExecution);
return false;
}
for (auto path: section.Paths()) {
if (!path.Table().Maybe<TYtTable>()) {
AddInfo(ctx, "non-table path", skipIssues);
AddMessage(ctx, "non-table path", skipIssues, State_->PassiveExecution);
return false;
} else {
auto pathInfo = TYtPathInfo(path);
auto tableInfo = pathInfo.Table;
auto epoch = TEpochInfo::Parse(path.Table().Maybe<TYtTable>().CommitEpoch().Ref());
if (!tableInfo->Stat) {
AddInfo(ctx, "table without statistics", skipIssues);
AddMessage(ctx, "table without statistics", skipIssues, State_->PassiveExecution);
return false;
} else if (!tableInfo->RowSpec) {
AddInfo(ctx, "table without row spec", skipIssues);
AddMessage(ctx, "table without row spec", skipIssues, State_->PassiveExecution);
return false;
} else if (!tableInfo->Meta) {
AddInfo(ctx, "table without meta", skipIssues);
AddMessage(ctx, "table without meta", skipIssues, State_->PassiveExecution);
return false;
} else if (tableInfo->IsAnonymous) {
AddInfo(ctx, "anonymous table", skipIssues);
AddMessage(ctx, "anonymous table", skipIssues, State_->PassiveExecution);
return false;
} else if ((!epoch.Empty() && *epoch.Get() > 0)) {
AddInfo(ctx, "table with non-empty epoch", skipIssues);
AddMessage(ctx, "table with non-empty epoch", skipIssues, State_->PassiveExecution);
return false;
} else if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) {
AddInfo(ctx, "table with QB2 premapper", skipIssues);
AddMessage(ctx, "table with QB2 premapper", skipIssues, State_->PassiveExecution);
return false;
} else if (pathInfo.Ranges && !canUseYtPartitioningApi) {
AddInfo(ctx, "table with ranges", skipIssues);
AddMessage(ctx, "table with ranges", skipIssues, State_->PassiveExecution);
return false;
} else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) {
AddInfo(ctx, "dynamic table", skipIssues);
AddMessage(ctx, "dynamic table", skipIssues, State_->PassiveExecution);
return false;
}

Expand All @@ -356,7 +367,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
}
}
if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) {
AddInfo(ctx, "table with too many chunks", skipIssues);
AddMessage(ctx, "table with too many chunks", skipIssues, State_->PassiveExecution);
return false;
}
return true;
Expand Down Expand Up @@ -505,7 +516,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
}

void AddErrorWrap(TExprContext& ctx, const NYql::TPositionHandle& where, const TString& cause) {
ctx.AddError(YqlIssue(ctx.GetPosition(where), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "DQ cannot execute the query. Cause: " << cause));
ctx.AddError(YqlIssue(ctx.GetPosition(where), TIssuesIds::DQ_OPTIMIZE_ERROR, DqFallbackErrorMessageWrap(cause)));
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
Expand Down Expand Up @@ -710,6 +721,10 @@ class TYtDqIntegration: public TDqIntegrationBase {
pipeline->Add(CreateYtPeepholeTransformer(TYtState::TPtr(State_), providerParams), "YtPeepHole", TIssuesIds::DEFAULT_ERROR);
}

static TString DqFallbackErrorMessageWrap(const TString& message) {
return "DQ cannot execute the query. Cause: " + message;
}

private:
TYtState* State_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3487,7 +3487,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
return node;
}

auto count = CleanupWorld(countBase.Count(), ctx);
auto count = State_->PassiveExecution ? countBase.Count() : CleanupWorld(countBase.Count(), ctx);
if (!count) {
return {};
}
Expand Down
Loading
Loading