Skip to content

KIKIMR-20543: ddl+dml in query service #1444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,24 @@ namespace NKikimr::NKqp::NPrivateEvents {

struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, TInstant deadline,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline,
TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false)
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing())
: UserToken(userToken)
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, IsQueryActionPrepare(isQueryActionPrepare)
, PerStatementResult(perStatementResult)
, Deadline(deadline)
, DbCounters(dbCounters)
, UserRequestContext(userRequestContext)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, CollectDiagnostics(collectDiagnostics)
, QueryAst(queryAst)
{
Y_ENSURE(Uid.Defined() != Query.Defined());
}
Expand All @@ -39,6 +41,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
TMaybe<TKqpQueryId> Query;
bool KeepInCache = false;
bool IsQueryActionPrepare = false;
bool PerStatementResult = false;
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
Expand All @@ -51,6 +54,8 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

bool CollectDiagnostics = false;

TMaybe<TQueryAst> QueryAst;
};

struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
Expand Down Expand Up @@ -103,12 +108,14 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
};

struct TEvParseResponse: public TEventLocal<TEvParseResponse, TKqpEvents::EvParseResponse> {
TEvParseResponse(const TKqpQueryId& query, TMaybe<TQueryAst> astResult)
: AstResult(std::move(astResult))
, Query(query) {}
TEvParseResponse(const TKqpQueryId& query, TVector<TQueryAst> astStatements, NLWTrace::TOrbit orbit = {})
: AstStatements(std::move(astStatements))
, Query(query)
, Orbit(std::move(orbit)) {}

TMaybe<TQueryAst> AstResult;
TVector<TQueryAst> AstStatements;
TKqpQueryId Query;
NLWTrace::TOrbit Orbit;
};

struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_lwtrace_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ struct TQueryAction {
PROBE(KqpCompileServiceReplyFromCache, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
PROBE(KqpCompileServiceReplyStatements, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
PROBE(KqpCompileServiceReplyError, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
Expand Down
67 changes: 45 additions & 22 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult)
bool perStatementResult, ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst)
: Owner(owner)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
, FederatedQuerySetup(federatedQuerySetup)
, Uid(uid)
, QueryId(queryId)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, queryAst)
, UserToken(userToken)
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
Expand All @@ -70,8 +70,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
, CollectFullDiagnostics(collectFullDiagnostics)
, PerStatementResult(perStatementResult)
, CompileAction(compileAction)
, AstResult(std::move(astResult))
, QueryAst(std::move(queryAst))
{
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false);

Expand Down Expand Up @@ -127,26 +128,22 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
}

private:
void SetQueryAst(const TActorContext &ctx) {
TString cluster = QueryId.Cluster;

TVector<TQueryAst> GetAstStatements(const TActorContext &ctx) {
TString cluster = QueryId.Cluster;
TString kqpTablePathPrefix = Config->_KqpTablePathPrefix.Get().GetRef();
ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion.Get().GetRef();
NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode;
bool isEnableExternalDataSources = AppData(ctx)->FeatureFlags.GetEnableExternalDataSources();
bool isEnablePgConstsToParams = Config->EnablePgConstsToParams;
bool perStatementExecution = Config->EnablePerStatementQueryExecution && PerStatementResult;

auto astResult = ParseQuery(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, QueryId.IsSql(), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams);
YQL_ENSURE(astResult.Ast);
if (astResult.Ast->IsOk()) {
AstResult = std::move(astResult);
}
return ParseStatements(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams, QueryId.IsSql(), perStatementExecution);
}

void StartParsing(const TActorContext &ctx) {
SetQueryAst(ctx);

Become(&TKqpCompileActor::CompileState);
ReplyParseResult(ctx);
ReplyParseResult(ctx, GetAstStatements(ctx));
}

void StartCompilation(const TActorContext &ctx) {
Expand Down Expand Up @@ -352,15 +349,38 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", at state:" << state);
}

void ReplyParseResult(const TActorContext &ctx) {
void ReplyParseResult(const TActorContext &ctx, TVector<TQueryAst>&& astStatements) {
Y_UNUSED(ctx);

if (astStatements.empty()) {
NYql::TIssue issue(NYql::TPosition(), "Parsing result of query is empty");
ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue});
return;
}

for (size_t statementId = 0; statementId < astStatements.size(); ++statementId) {
if (!astStatements[statementId].Ast || !astStatements[statementId].Ast->IsOk() || !astStatements[statementId].Ast->Root) {
ALOG_ERROR(NKikimrServices::KQP_COMPILE_ACTOR, "Get parsing result with error"
<< ", self: " << SelfId()
<< ", owner: " << Owner
<< ", statement id: " << statementId);

NYql::TIssue issue(NYql::TPosition(), "Error while parsing query.");
for (const auto& i : astStatements[statementId].Ast->Issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
}

ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue});
return;
}
}

ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send parsing result"
<< ", self: " << SelfId()
<< ", owner: " << Owner
<< (AstResult && AstResult->Ast->IsOk() ? ", parsing is successful" : ", parsing is not successful"));
<< ", statements size: " << astStatements.size());

auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(AstResult));
AstResult = Nothing();
auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(astStatements));
Send(Owner, responseEv.Release());

Counters->ReportCompileFinish(DbCounters);
Expand All @@ -379,8 +399,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
if (QueryAst) {
KqpCompileResult->Ast = QueryAst->Ast;
}
}

Expand Down Expand Up @@ -482,8 +502,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpTempTablesState::TConstPtr TempTablesState;
bool CollectFullDiagnostics;

const bool PerStatementResult;
ECompileActorAction CompileAction;
TMaybe<TQueryAst> AstResult;
TMaybe<TQueryAst> QueryAst;
};

void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
Expand Down Expand Up @@ -512,6 +533,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode();
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams() && serviceConfig.GetEnableAstCache();
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit();
kqpConfig.EnablePerStatementQueryExecution = serviceConfig.GetEnablePerStatementQueryExecution();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand All @@ -527,14 +549,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics)
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
bool perStatementResult)
{
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
moduleResolverState, counters,
uid, query, userToken, dbCounters,
federatedQuerySetup, userRequestContext,
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
compileAction, std::move(astResult));
perStatementResult, compileAction, std::move(queryAst));
}

} // namespace NKqp
Expand Down
Loading