Skip to content

Commit 42acc61

Browse files
authored
Merge b67f7fa into 591f564
2 parents 591f564 + b67f7fa commit 42acc61

File tree

7 files changed

+110
-28
lines changed

7 files changed

+110
-28
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+17-11
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,18 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
372372
PassAway();
373373
}
374374

375+
void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
376+
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
377+
preparingQuery.release(), AppData()->FunctionRegistry);
378+
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
379+
KqpCompileResult->PreparedQuery = preparedQueryHolder;
380+
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
381+
382+
if (AstResult) {
383+
KqpCompileResult->Ast = AstResult->Ast;
384+
}
385+
}
386+
375387
void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
376388
Y_ENSURE(!ev->Get()->QueryId);
377389

@@ -403,17 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
403415

404416
if (status == Ydb::StatusIds::SUCCESS) {
405417
YQL_ENSURE(kqpResult.PreparingQuery);
406-
{
407-
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
408-
kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
409-
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
410-
KqpCompileResult->PreparedQuery = preparedQueryHolder;
411-
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
412-
413-
if (AstResult) {
414-
KqpCompileResult->Ast = AstResult->Ast;
415-
}
416-
}
418+
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
417419

418420
auto now = TInstant::Now();
419421
auto duration = now - StartTime;
@@ -423,6 +425,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
423425
<< ", self: " << ctx.SelfID
424426
<< ", duration: " << duration);
425427
} else {
428+
if (kqpResult.PreparingQuery) {
429+
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
430+
}
431+
426432
LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
427433
<< ", self: " << ctx.SelfID
428434
<< ", status: " << Ydb::StatusIds_StatusCode_Name(status)

ydb/core/kqp/host/kqp_host.cpp

+36-3
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResul
182182
, SqlVersion(sqlVersion) {}
183183

184184
void FillResult(TResult& validateResult) const override {
185+
if (!validateResult.Success()) {
186+
return;
187+
}
188+
185189
YQL_ENSURE(SessionCtx->Query().PrepareOnly);
186190
validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release());
187191
validateResult.SqlVersion = SqlVersion;
@@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
211215
, UseDqExplain(useDqExplain) {}
212216

213217
void FillResult(TResult& queryResult) const override {
218+
if (!queryResult.Success()) {
219+
return;
220+
}
221+
214222
if (UseDqExplain) {
215223
TVector<const TString> plans;
216224
for (auto id : SessionCtx->Query().ExecutionOrder) {
@@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
253261
, SqlVersion(sqlVersion) {}
254262

255263
void FillResult(TResult& queryResult) const override {
264+
if (!queryResult.Success()) {
265+
return;
266+
}
267+
256268
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
257269
queryResult.Results.emplace_back(
258270
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
@@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
300312
, ExecuteCtx(executeCtx) {}
301313

302314
void FillResult(TResult& queryResult) const override {
315+
if (!queryResult.Success()) {
316+
return;
317+
}
318+
303319
YQL_ENSURE(ExecuteCtx.QueryResults.size() == 1);
304320
queryResult = std::move(ExecuteCtx.QueryResults[0]);
305321
queryResult.QueryPlan = queryResult.PreparingQuery->GetPhysicalQuery().GetQueryPlan();
@@ -320,13 +336,24 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
320336
using TResult = IKqpHost::TQueryResult;
321337

322338
TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
323-
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion)
339+
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion,
340+
TIntrusivePtr<TKqlTransformContext> transformCtx = nullptr)
324341
: TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
325342
, QueryCtx(queryCtx)
343+
, ExprCtx(exprCtx)
344+
, TransformCtx(transformCtx)
326345
, QueryText(query.Text)
327346
, SqlVersion(sqlVersion) {}
328347

329348
void FillResult(TResult& prepareResult) const override {
349+
if (!prepareResult.Success() && TransformCtx) {
350+
if (auto exprRoot = TransformCtx->ExplainTransformerInput ? TransformCtx->ExplainTransformerInput : GetExprRoot()) {
351+
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
352+
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*GetExprRoot(), ExprCtx));
353+
}
354+
return;
355+
}
356+
330357
YQL_ENSURE(QueryCtx->PrepareOnly);
331358
YQL_ENSURE(QueryCtx->PreparingQuery);
332359

@@ -344,6 +371,8 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
344371

345372
private:
346373
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
374+
NYql::TExprContext& ExprCtx;
375+
TIntrusivePtr<TKqlTransformContext> TransformCtx;
347376
TString QueryText;
348377
TMaybe<TSqlVersion> SqlVersion;
349378
};
@@ -933,6 +962,7 @@ class TKqpHost : public IKqpHost {
933962
, IsInternalCall(isInternalCall)
934963
, FederatedQuerySetup(federatedQuerySetup)
935964
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
965+
, Config(config)
936966
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
937967
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
938968
, FakeWorld(ExprCtx->NewWorld(TPosition()))
@@ -1327,7 +1357,7 @@ class TKqpHost : public IKqpHost {
13271357
}
13281358

13291359
return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
1330-
query.Text, sqlVersion);
1360+
query.Text, sqlVersion, TransformCtx);
13311361
}
13321362

13331363
IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx,
@@ -1502,7 +1532,8 @@ class TKqpHost : public IKqpHost {
15021532
}
15031533

15041534
void Init(EKikimrQueryType queryType) {
1505-
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
1535+
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
1536+
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);
15061537

15071538
ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
15081539
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
@@ -1635,6 +1666,7 @@ class TKqpHost : public IKqpHost {
16351666
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
16361667

16371668
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
1669+
TKikimrConfiguration::TPtr Config;
16381670

16391671
TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder;
16401672
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
@@ -1648,6 +1680,7 @@ class TKqpHost : public IKqpHost {
16481680
TExprNode::TPtr FakeWorld;
16491681

16501682
TIntrusivePtr<TExecuteContext> ExecuteCtx;
1683+
TIntrusivePtr<TKqlTransformContext> TransformCtx;
16511684
TIntrusivePtr<IKqpRunner> KqpRunner;
16521685
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};
16531686

ydb/core/kqp/host/kqp_host_impl.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
3434
YQL_ENSURE(HasResult());
3535

3636
if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) {
37-
return NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
37+
TResult result = NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
38+
FillResult(result);
39+
return result;
3840
}
3941

4042
YQL_ENSURE(Status.GetValue() == NYql::IGraphTransformer::TStatus::Ok);
@@ -244,7 +246,7 @@ class IKqpRunner : public TThrRefBase {
244246

245247
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
246248
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
247-
const NMiniKQL::IFunctionRegistry& funcRegistry);
249+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);
248250

249251
TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
250252
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,

ydb/core/kqp/host/kqp_runner.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner {
137137
public:
138138
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
139139
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
140-
const NMiniKQL::IFunctionRegistry& funcRegistry)
140+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
141141
: Gateway(gateway)
142142
, Cluster(cluster)
143143
, TypesCtx(*typesCtx)
144144
, SessionCtx(sessionCtx)
145145
, FunctionRegistry(funcRegistry)
146146
, Config(sessionCtx->ConfigPtr())
147-
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
147+
, TransformCtx(transformCtx)
148148
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
149149
sessionCtx->TablesPtr()))
150150
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
@@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner {
377377

378378
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
379379
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
380-
const NMiniKQL::IFunctionRegistry& funcRegistry)
380+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
381381
{
382-
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry);
382+
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
383383
}
384384

385385
} // namespace NKqp

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+11-4
Original file line numberDiff line numberDiff line change
@@ -1720,10 +1720,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17201720

17211721
const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
17221722
FillColumnsMeta(phyQuery, response);
1723-
} else if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
1724-
// The compile timeout cause cancelation execution of request.
1725-
// So in case of cancel after we can reply with canceled status
1726-
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
1723+
} else {
1724+
if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
1725+
// The compile timeout cause cancelation execution of request.
1726+
// So in case of cancel after we can reply with canceled status
1727+
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
1728+
}
1729+
1730+
auto& preparedQuery = compileResult->PreparedQuery;
1731+
if (preparedQuery && QueryState->ReportStats() && QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
1732+
response.SetQueryAst(preparedQuery->GetPhysicalQuery().GetQueryAst());
1733+
}
17271734
}
17281735
}
17291736

ydb/tests/fq/s3/test_bindings.py

+33
Original file line numberDiff line numberDiff line change
@@ -586,3 +586,36 @@ def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax):
586586
else:
587587
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
588588
assert result_set.rows[0].items[0].uint64_value == 1
589+
590+
@yq_all
591+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
592+
def test_ast_in_failed_query_compilation(self, kikimr, s3, client):
593+
resource = boto3.resource(
594+
"s3",
595+
endpoint_url=s3.s3_url,
596+
aws_access_key_id="key",
597+
aws_secret_access_key="secret_key"
598+
)
599+
600+
bucket = resource.Bucket("bindbucket")
601+
bucket.create(ACL='public-read')
602+
bucket.objects.all().delete()
603+
604+
connection_id = client.create_storage_connection("bb", "bindbucket").result.connection_id
605+
606+
data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))
607+
client.create_object_storage_binding(name="s3binding",
608+
path="/",
609+
format="raw",
610+
connection_id=connection_id,
611+
columns=[data_column])
612+
613+
sql = R'''
614+
SELECT some_unknown_column FROM bindings.`s3binding`;
615+
'''
616+
617+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
618+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
619+
620+
ast = client.describe_query(query_id).result.query.ast.data
621+
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"

ydb/tests/fq/yds/test_select_1.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# -*- coding: utf-8 -*-
33

44
import logging
5+
import sys
56

67
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all
78

@@ -120,11 +121,11 @@ def test_compile_error(self, client, yq_version):
120121
assert "Failed to parse query" in describe_string, describe_string
121122

122123
@yq_all
123-
def test_ast_in_failed_query(self, client):
124-
sql = "SELECT unwrap(1 / 0)"
124+
def test_ast_in_failed_query_runtime(self, client):
125+
sql = "SELECT unwrap(42 / 0) AS error_column"
125126

126127
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
127128
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
128129

129-
ast = str(client.describe_query(query_id).result.query.ast)
130-
assert ast != "", "Query ast not found"
130+
ast = client.describe_query(query_id).result.query.ast.data
131+
assert "(\'\"error_column\" (Unwrap (/ (Int32 \'\"42\")" in ast, "Invalid query ast"

0 commit comments

Comments
 (0)