Skip to content

Commit 988df97

Browse files
authored
Merge 0af5e81 into 9539af5
2 parents 9539af5 + 0af5e81 commit 988df97

File tree

22 files changed

+236
-228
lines changed

22 files changed

+236
-228
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
5353
},
5454
{
5555
ToString(NYql::EDatabaseType::YT),
56-
CreateExternalDataSource(TString{NYql::YtProviderName}, {"TOKEN"}, {}, hostnamePatternsRegEx)
56+
CreateExternalDataSource(TString{NYql::YtProviderName}, {"NONE", "TOKEN"}, {}, hostnamePatternsRegEx)
5757
}
5858
});
5959
}

ydb/core/kqp/common/kqp_yql.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
44
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
5+
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
6+
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
57

68
namespace NYql {
79

@@ -454,4 +456,12 @@ TString PrintKqpStageOnly(const TDqStageBase& stage, TExprContext& ctx) {
454456
return KqpExprToPrettyString(TExprBase(newStage), ctx);
455457
}
456458

459+
TAutoPtr<IGraphTransformer> GetDqIntegrationPeepholeTransformer(bool beforeDqTransforms, TIntrusivePtr<TTypeAnnotationContext> typesCtx) {
460+
TTransformationPipeline dqIntegrationPeepholePipeline(typesCtx);
461+
for (auto* dqIntegration : GetUniqueIntegrations(*typesCtx)) {
462+
dqIntegration->ConfigurePeepholePipeline(beforeDqTransforms, {}, &dqIntegrationPeepholePipeline);
463+
}
464+
return dqIntegrationPeepholePipeline.Build();
465+
}
466+
457467
} // namespace NYql

ydb/core/kqp/common/kqp_yql.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,8 @@ TString KqpExprToPrettyString(const NNodes::TExprBase& expr, TExprContext& ctx);
124124

125125
TString PrintKqpStageOnly(const NNodes::TDqStageBase& stage, TExprContext& ctx);
126126

127+
class IGraphTransformer;
128+
struct TTypeAnnotationContext;
129+
TAutoPtr<IGraphTransformer> GetDqIntegrationPeepholeTransformer(bool beforeDqTransforms, TIntrusivePtr<TTypeAnnotationContext> typesCtx);
130+
127131
} // namespace NYql

ydb/core/kqp/common/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ PEERDIR(
3636
ydb/library/yql/dq/expr_nodes
3737
ydb/library/aclib
3838
ydb/library/yql/core/issue
39+
ydb/library/yql/core/services
3940
ydb/library/yql/dq/actors
4041
ydb/library/yql/dq/common
42+
ydb/library/yql/dq/integration
4143
ydb/library/yql/parser/pg_wrapper/interface
4244
ydb/public/lib/operation_id
4345
ydb/public/lib/operation_id/protos

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1830,6 +1830,7 @@ class TKqpHost : public IKqpHost {
18301830
.Add(TCollectParametersTransformer::Sync(SessionCtx->QueryPtr()), "CollectParameters")
18311831
.AddPostTypeAnnotation()
18321832
.AddOptimization(true, false)
1833+
.Add(GetDqIntegrationPeepholeTransformer(true, TypesCtx), "DqIntegrationPeephole")
18331834
.Add(TLogExprTransformer::Sync("Optimized expr"), "LogExpr")
18341835
.AddRun(&NullProgressWriter)
18351836
.Build();

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ class TKqpRunner : public IKqpRunner {
322322
.Add(Log("PhysicalPeephole"), "LogPhysicalPeephole")
323323
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
324324
.AddPostTypeAnnotation()
325+
.Add(GetDqIntegrationPeepholeTransformer(false, typesCtx), "DqIntegrationPeephole")
325326
.Add(
326327
CreateKqpTxsPeepholeTransformer(
327328
CreateTypeAnnotationTransformer(

ydb/core/kqp/provider/yql_kikimr_opt_build.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ struct TKiExploreTxResults {
9191
TVector<TExprBase> Sync;
9292
TVector<TKiQueryBlock> QueryBlocks;
9393
bool HasExecute;
94+
bool HasErrors;
9495

9596
THashSet<const TExprNode*> GetSyncSet() const {
9697
THashSet<const TExprNode*> syncSet;
@@ -280,10 +281,11 @@ struct TKiExploreTxResults {
280281
}
281282

282283
TKiExploreTxResults()
283-
: HasExecute(false) {}
284+
: HasExecute(false)
285+
, HasErrors(false) {}
284286
};
285287

286-
bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize = true) {
288+
bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize, bool* hasErrors = nullptr) {
287289
if (node.Ref().ChildrenSize() <= 1) {
288290
return false;
289291
}
@@ -294,12 +296,17 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
294296
auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory);
295297
if (dataSourceProviderIt != types.DataSourceMap.end()) {
296298
if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) {
297-
if (dqIntegration->CanRead(*node.Ptr(), ctx) &&
298-
(!estimateReadSize || dqIntegration->EstimateReadSize(
299+
if (!dqIntegration->CanRead(*node.Ptr(), ctx)) {
300+
if (!node.Ref().IsCallable(ConfigureName) && hasErrors) {
301+
*hasErrors = true;
302+
}
303+
return false;
304+
}
305+
if (!estimateReadSize || dqIntegration->EstimateReadSize(
299306
TDqSettings::TDefault::DataSizePerJob,
300307
TDqSettings::TDefault::MaxTasksPerStage,
301308
{node.Raw()},
302-
ctx))) {
309+
ctx)) {
303310
return true;
304311
}
305312
}
@@ -388,7 +395,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
388395
return result;
389396
}
390397

391-
if (IsDqRead(node, ctx, types)) {
398+
if (IsDqRead(node, ctx, types, true, &txRes.HasErrors)) {
392399
txRes.Ops.insert(node.Raw());
393400
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
394401
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
@@ -849,8 +856,8 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
849856

850857
TKiExploreTxResults txExplore;
851858
txExplore.ConcurrentResults = concurrentResults;
852-
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types)) {
853-
return node.Ptr();
859+
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types) || txExplore.HasErrors) {
860+
return txExplore.HasErrors ? nullptr : node.Ptr();
854861
}
855862

856863
if (txExplore.HasExecute) {

ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -231,18 +231,8 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
231231
return TRuntimeNode();
232232
});
233233

234-
std::unordered_set<TString> usedProviders;
235-
for (const auto& provider : typesCtx.DataSources) {
236-
if (auto* dqIntegration = provider->GetDqIntegration()) {
237-
dqIntegration->RegisterMkqlCompiler(*compiler);
238-
usedProviders.emplace(provider->GetName());
239-
}
240-
}
241-
242-
for (const auto& provider : typesCtx.DataSinks) {
243-
if (auto* dqIntegration = provider->GetDqIntegration(); dqIntegration && !usedProviders.contains(TString(provider->GetName()))) {
244-
dqIntegration->RegisterMkqlCompiler(*compiler);
245-
}
234+
for (auto* dqIntegration : GetUniqueIntegrations(typesCtx)) {
235+
dqIntegration->RegisterMkqlCompiler(*compiler);
246236
}
247237

248238
compiler->AddCallable(TKqpWideReadTable::CallableName(),
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,24 @@
11
#include "yql_dq_integration.h"
2+
3+
#include <ydb/library/yql/core/yql_type_annotation.h>
4+
5+
namespace NYql {
6+
7+
std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx) {
8+
std::unordered_set<IDqIntegration*> uniqueIntegrations(typesCtx.DataSources.size() + typesCtx.DataSinks.size());
9+
for (const auto& provider : typesCtx.DataSources) {
10+
if (auto* dqIntegration = provider->GetDqIntegration()) {
11+
uniqueIntegrations.emplace(dqIntegration);
12+
}
13+
}
14+
15+
for (const auto& provider : typesCtx.DataSinks) {
16+
if (auto* dqIntegration = provider->GetDqIntegration()) {
17+
uniqueIntegrations.emplace(dqIntegration);
18+
}
19+
}
20+
21+
return uniqueIntegrations;
22+
}
23+
24+
} // namespace NYql

ydb/library/yql/dq/integration/yql_dq_integration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,6 @@ class IDqIntegration {
7777
virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
7878
};
7979

80+
std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx);
81+
8082
} // namespace NYql

ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,7 @@ void RegisterDqsMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, cons
1616
return TRuntimeNode();
1717
});
1818

19-
std::unordered_set<IDqIntegration*> integrations(ctx.DataSources.size() + ctx.DataSinks.size());
20-
for (const auto& ds: ctx.DataSources) {
21-
if (const auto dq = ds->GetDqIntegration()) {
22-
integrations.emplace(dq);
23-
}
24-
}
25-
for (const auto& ds: ctx.DataSinks) {
26-
if (const auto dq = ds->GetDqIntegration()) {
27-
integrations.emplace(dq);
28-
}
29-
}
19+
auto integrations = GetUniqueIntegrations(ctx);
3020
std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::RegisterMkqlCompiler, std::placeholders::_1, std::ref(compiler)));
3121
}
3222

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -266,18 +266,9 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
266266
TDqsPipelineConfigurator(const TDqStatePtr& state, const THashMap<TString, TString>& providerParams)
267267
: State_(state)
268268
, ProviderParams_(providerParams)
269-
{
270-
for (const auto& ds: State_->TypeCtx->DataSources) {
271-
if (const auto dq = ds->GetDqIntegration()) {
272-
UniqIntegrations_.emplace(dq);
273-
}
274-
}
275-
for (const auto& ds: State_->TypeCtx->DataSinks) {
276-
if (const auto dq = ds->GetDqIntegration()) {
277-
UniqIntegrations_.emplace(dq);
278-
}
279-
}
280-
}
269+
, UniqIntegrations_(GetUniqueIntegrations(*State_->TypeCtx))
270+
{}
271+
281272
private:
282273
void AfterCreate(TTransformationPipeline*) const final {}
283274

ydb/library/yql/providers/generic/connector/tests/utils/run/kqprun.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def run(self, test_name: str, script: str, generic_settings: GenericSettings) ->
191191
result_path = artifacts.make_path(test_name=test_name, artifact_name='result.json')
192192

193193
# For debug add option --trace-opt to args
194-
cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full'
194+
cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full-json'
195195

196196
output = None
197197
data_out = None

ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
118118
}
119119

120120
if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); canFallback && chunksCount > maxChunks) {
121-
throw TFallbackError() << "DQ cannot execute the query. Cause: table with too many chunks";
121+
throw TFallbackError() << DqFallbackErrorMessageWrap("table with too many chunks");
122122
}
123123

124124
if (hasErasure) {
@@ -162,7 +162,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
162162
.Config(State_->Configuration->Snapshot())
163163
.Paths(std::move(paths)));
164164
if (!res.Success()) {
165-
const auto message = TStringBuilder() << "DQ cannot execute the query. Cause: failed to partition table";
165+
const auto message = DqFallbackErrorMessageWrap("failed to partition table");
166166
YQL_CLOG(ERROR, ProviderDq) << message;
167167
auto issue = YqlIssue(TPosition(), TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, message);
168168
for (auto& subIssue: res.Issues()) {
@@ -193,7 +193,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
193193
TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes);
194194
ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob;
195195
if (canFallback && hasErasure && parts > maxTasks) {
196-
std::string_view message = "DQ cannot execute the query. Cause: too big table with erasure codec";
196+
auto message = DqFallbackErrorMessageWrap("too big table with erasure codec");
197197
YQL_CLOG(INFO, ProviderDq) << message;
198198
throw TFallbackError() << message;
199199
}
@@ -242,13 +242,24 @@ class TYtDqIntegration: public TDqIntegrationBase {
242242
return maxDataSizePerJob;
243243
}
244244

245-
void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues) {
246-
if (!skipIssues) {
245+
void AddMessage(TExprContext& ctx, const TString& message, bool skipIssues, bool riseError) {
246+
if (skipIssues && !riseError) {
247+
return;
248+
}
249+
250+
TIssue issue(DqFallbackErrorMessageWrap(message));
251+
if (riseError) {
252+
YQL_CLOG(ERROR, ProviderDq) << message;
253+
issue.Severity = TSeverityIds::S_ERROR;
254+
} else {
247255
YQL_CLOG(INFO, ProviderDq) << message;
248-
TIssue info("DQ cannot execute the query. Cause: " + message);
249-
info.Severity = TSeverityIds::S_INFO;
250-
ctx.IssueManager.RaiseIssue(info);
256+
issue.Severity = TSeverityIds::S_INFO;
251257
}
258+
ctx.IssueManager.RaiseIssue(issue);
259+
}
260+
261+
void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues) {
262+
AddMessage(ctx, message, skipIssues, false);
252263
}
253264

254265
bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override {
@@ -293,7 +304,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
293304
} else if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) {
294305
auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
295306
if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
296-
AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues);
307+
AddMessage(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, State_->PassiveExecution);
297308
return false;
298309
}
299310
const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
@@ -309,45 +320,45 @@ class TYtDqIntegration: public TDqIntegrationBase {
309320
}
310321
}
311322
}
312-
AddInfo(ctx, info, skipIssues);
323+
AddMessage(ctx, info, skipIssues, State_->PassiveExecution);
313324
return false;
314325
}
315326
auto sampleSetting = GetSetting(section.Settings().Ref(), EYtSettingType::Sample);
316327
if (sampleSetting && sampleSetting->Child(1)->Child(0)->Content() == "system") {
317-
AddInfo(ctx, "system sampling", skipIssues);
328+
AddMessage(ctx, "system sampling", skipIssues, State_->PassiveExecution);
318329
return false;
319330
}
320331
for (auto path: section.Paths()) {
321332
if (!path.Table().Maybe<TYtTable>()) {
322-
AddInfo(ctx, "non-table path", skipIssues);
333+
AddMessage(ctx, "non-table path", skipIssues, State_->PassiveExecution);
323334
return false;
324335
} else {
325336
auto pathInfo = TYtPathInfo(path);
326337
auto tableInfo = pathInfo.Table;
327338
auto epoch = TEpochInfo::Parse(path.Table().Maybe<TYtTable>().CommitEpoch().Ref());
328339
if (!tableInfo->Stat) {
329-
AddInfo(ctx, "table without statistics", skipIssues);
340+
AddMessage(ctx, "table without statistics", skipIssues, State_->PassiveExecution);
330341
return false;
331342
} else if (!tableInfo->RowSpec) {
332-
AddInfo(ctx, "table without row spec", skipIssues);
343+
AddMessage(ctx, "table without row spec", skipIssues, State_->PassiveExecution);
333344
return false;
334345
} else if (!tableInfo->Meta) {
335-
AddInfo(ctx, "table without meta", skipIssues);
346+
AddMessage(ctx, "table without meta", skipIssues, State_->PassiveExecution);
336347
return false;
337348
} else if (tableInfo->IsAnonymous) {
338-
AddInfo(ctx, "anonymous table", skipIssues);
349+
AddMessage(ctx, "anonymous table", skipIssues, State_->PassiveExecution);
339350
return false;
340351
} else if ((!epoch.Empty() && *epoch.Get() > 0)) {
341-
AddInfo(ctx, "table with non-empty epoch", skipIssues);
352+
AddMessage(ctx, "table with non-empty epoch", skipIssues, State_->PassiveExecution);
342353
return false;
343354
} else if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) {
344-
AddInfo(ctx, "table with QB2 premapper", skipIssues);
355+
AddMessage(ctx, "table with QB2 premapper", skipIssues, State_->PassiveExecution);
345356
return false;
346357
} else if (pathInfo.Ranges && !canUseYtPartitioningApi) {
347-
AddInfo(ctx, "table with ranges", skipIssues);
358+
AddMessage(ctx, "table with ranges", skipIssues, State_->PassiveExecution);
348359
return false;
349360
} else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) {
350-
AddInfo(ctx, "dynamic table", skipIssues);
361+
AddMessage(ctx, "dynamic table", skipIssues, State_->PassiveExecution);
351362
return false;
352363
}
353364

@@ -356,7 +367,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
356367
}
357368
}
358369
if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) {
359-
AddInfo(ctx, "table with too many chunks", skipIssues);
370+
AddMessage(ctx, "table with too many chunks", skipIssues, State_->PassiveExecution);
360371
return false;
361372
}
362373
return true;
@@ -505,7 +516,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
505516
}
506517

507518
void AddErrorWrap(TExprContext& ctx, const NYql::TPositionHandle& where, const TString& cause) {
508-
ctx.AddError(YqlIssue(ctx.GetPosition(where), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "DQ cannot execute the query. Cause: " << cause));
519+
ctx.AddError(YqlIssue(ctx.GetPosition(where), TIssuesIds::DQ_OPTIMIZE_ERROR, DqFallbackErrorMessageWrap(cause)));
509520
}
510521

511522
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
@@ -710,6 +721,10 @@ class TYtDqIntegration: public TDqIntegrationBase {
710721
pipeline->Add(CreateYtPeepholeTransformer(TYtState::TPtr(State_), providerParams), "YtPeepHole", TIssuesIds::DEFAULT_ERROR);
711722
}
712723

724+
static TString DqFallbackErrorMessageWrap(const TString& message) {
725+
return "DQ cannot execute the query. Cause: " + message;
726+
}
727+
713728
private:
714729
TYtState* State_;
715730
};

ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3487,7 +3487,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
34873487
return node;
34883488
}
34893489

3490-
auto count = CleanupWorld(countBase.Count(), ctx);
3490+
auto count = State_->PassiveExecution ? countBase.Count() : CleanupWorld(countBase.Count(), ctx);
34913491
if (!count) {
34923492
return {};
34933493
}

0 commit comments

Comments
 (0)