@@ -52,14 +52,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
52
52
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
53
53
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
54
54
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
55
- ECompileActorAction compileAction, TMaybe<TQueryAst> astResult )
55
+ bool canDevideIntoStatements, ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst )
56
56
: Owner(owner)
57
57
, ModuleResolverState(moduleResolverState)
58
58
, Counters(counters)
59
59
, FederatedQuerySetup(federatedQuerySetup)
60
60
, Uid(uid)
61
61
, QueryId(queryId)
62
- , QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult )
62
+ , QueryRef(QueryId.Text, QueryId.QueryParameterTypes, queryAst )
63
63
, UserToken(userToken)
64
64
, DbCounters(dbCounters)
65
65
, Config(MakeIntrusive<TKikimrConfiguration>())
@@ -70,8 +70,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
70
70
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), " CompileActor" )
71
71
, TempTablesState(std::move(tempTablesState))
72
72
, CollectFullDiagnostics(collectFullDiagnostics)
73
+ , CanDevideIntoStatements(canDevideIntoStatements)
73
74
, CompileAction(compileAction)
74
- , AstResult (std::move(astResult ))
75
+ , QueryAst (std::move(queryAst ))
75
76
{
76
77
Config->Init (kqpSettings->DefaultSettings .GetDefaultSettings (), QueryId.Cluster , kqpSettings->Settings , false );
77
78
@@ -127,26 +128,22 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
127
128
}
128
129
129
130
private:
130
- void SetQueryAst (const TActorContext &ctx) {
131
+
132
+ TVector<TQueryAst> GetAstStatements (const TActorContext &ctx) {
131
133
TString cluster = QueryId.Cluster ;
132
134
TString kqpTablePathPrefix = Config->_KqpTablePathPrefix .Get ().GetRef ();
133
135
ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion .Get ().GetRef ();
134
136
NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode ;
135
137
bool isEnableExternalDataSources = AppData (ctx)->FeatureFlags .GetEnableExternalDataSources ();
136
138
bool isEnablePgConstsToParams = Config->EnablePgConstsToParams ;
139
+ bool perStatement = Config->EnableQueriesPerStatement && CanDevideIntoStatements;
137
140
138
- auto astResult = ParseQuery (ConvertType (QueryId.Settings .QueryType ), QueryId.Settings .Syntax , QueryId.Text , QueryId.QueryParameterTypes , QueryId.IsSql (), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams);
139
- YQL_ENSURE (astResult.Ast );
140
- if (astResult.Ast ->IsOk ()) {
141
- AstResult = std::move (astResult);
142
- }
141
+ return SqlToAstStatements (ConvertType (QueryId.Settings .QueryType ), QueryId.Settings .Syntax , QueryId.Text , QueryId.QueryParameterTypes , cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams, QueryId.IsSql (), perStatement);
143
142
}
144
143
145
144
void StartParsing (const TActorContext &ctx) {
146
- SetQueryAst (ctx);
147
-
148
145
Become (&TKqpCompileActor::CompileState);
149
- ReplyParseResult (ctx);
146
+ ReplyParseResult (ctx, GetAstStatements (ctx) );
150
147
}
151
148
152
149
void StartCompilation (const TActorContext &ctx) {
@@ -352,15 +349,37 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
352
349
<< " , at state:" << state);
353
350
}
354
351
355
- void ReplyParseResult (const TActorContext &ctx) {
352
+ void ReplyParseResult (const TActorContext &ctx, TVector<TQueryAst> astStatements ) {
356
353
Y_UNUSED (ctx);
354
+
355
+ if (astStatements.empty ()) {
356
+ NYql::TIssue issue (NYql::TPosition (), " Parsing result of query is empty" );
357
+ ReplyError (Ydb::StatusIds::GENERIC_ERROR, {issue});
358
+ }
359
+
360
+ for (size_t statementId = 0 ; statementId < astStatements.size (); ++statementId) {
361
+ if (!astStatements[statementId].Ast || !astStatements[statementId].Ast ->IsOk () || !astStatements[statementId].Ast ->Root ) {
362
+ ALOG_ERROR (NKikimrServices::KQP_COMPILE_ACTOR, " Get parsing result with error"
363
+ << " , self: " << SelfId ()
364
+ << " , owner: " << Owner
365
+ << " , statement id: " << statementId);
366
+
367
+ NYql::TIssue issue (NYql::TPosition (), " Internal error while parsing query." );
368
+ for (const auto & i : astStatements[statementId].Ast ->Issues ) {
369
+ issue.AddSubIssue (MakeIntrusive<NYql::TIssue>(i));
370
+ }
371
+
372
+ ReplyError (Ydb::StatusIds::GENERIC_ERROR, {issue});
373
+ return ;
374
+ }
375
+ }
376
+
357
377
ALOG_DEBUG (NKikimrServices::KQP_COMPILE_ACTOR, " Send parsing result"
358
378
<< " , self: " << SelfId ()
359
379
<< " , owner: " << Owner
360
- << (AstResult && AstResult-> Ast -> IsOk () ? " , parsing is successful " : " , parsing is not successful " ));
380
+ << " , statements size : " << astStatements. size ( ));
361
381
362
- auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move (AstResult));
363
- AstResult = Nothing ();
382
+ auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, astStatements);
364
383
Send (Owner, responseEv.Release ());
365
384
366
385
Counters->ReportCompileFinish (DbCounters);
@@ -410,8 +429,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
410
429
KqpCompileResult->PreparedQuery = preparedQueryHolder;
411
430
KqpCompileResult->AllowCache = CanCacheQuery (KqpCompileResult->PreparedQuery ->GetPhysicalQuery ());
412
431
413
- if (AstResult ) {
414
- KqpCompileResult->Ast = AstResult ->Ast ;
432
+ if (QueryAst ) {
433
+ KqpCompileResult->Ast = QueryAst ->Ast ;
415
434
}
416
435
}
417
436
@@ -476,8 +495,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
476
495
TKqpTempTablesState::TConstPtr TempTablesState;
477
496
bool CollectFullDiagnostics;
478
497
498
+ bool CanDevideIntoStatements;
479
499
ECompileActorAction CompileAction;
480
- TMaybe<TQueryAst> AstResult ;
500
+ TMaybe<TQueryAst> QueryAst ;
481
501
};
482
502
483
503
void ApplyServiceConfig (TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
@@ -506,6 +526,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
506
526
kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode ();
507
527
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams ();
508
528
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit ();
529
+ kqpConfig.EnableQueriesPerStatement = serviceConfig.GetEnableQueriesPerStatement ();
509
530
510
531
if (const auto limit = serviceConfig.GetResourceManager ().GetMkqlHeavyProgramMemoryLimit ()) {
511
532
kqpConfig._KqpYqlCombinerMemoryLimit = std::max (1_GB, limit - (limit >> 2U ));
@@ -521,14 +542,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
521
542
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
522
543
TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
523
544
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
524
- ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics)
545
+ ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
546
+ bool canDevideIntoStatements)
525
547
{
526
548
return new TKqpCompileActor (owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
527
549
moduleResolverState, counters,
528
550
uid, query, userToken, dbCounters,
529
551
federatedQuerySetup, userRequestContext,
530
552
std::move (traceId), std::move (tempTablesState), collectFullDiagnostics,
531
- compileAction, std::move (astResult ));
553
+ canDevideIntoStatements, compileAction, std::move (queryAst ));
532
554
}
533
555
534
556
} // namespace NKqp
0 commit comments