Skip to content

Allow single CTAS without perstatement #8241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
}

void StartSplitting(const TActorContext &ctx) {
YQL_ENSURE(PerStatementResult);

const auto prepareSettings = PrepareCompilationSettings(ctx);
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);

Expand Down Expand Up @@ -280,7 +278,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
prepareSettings.IsInternalCall = QueryId.Settings.IsInternalCall;
prepareSettings.PerStatementResult = PerStatementResult;

switch (QueryId.Settings.Syntax) {
case Ydb::Query::Syntax::SYNTAX_YQL_V1:
Expand Down
38 changes: 19 additions & 19 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1259,8 +1259,13 @@ class TKqpHost : public IKqpHost {
YQL_CLOG(INFO, ProviderKqp) << "Compiled query:\n" << KqpExprToPrettyString(*queryExpr, ctx);

if (Config->EnableCreateTableAs) {
result.QueryExprs = RewriteExpression(queryExpr, ctx, *TypesCtx, SessionCtx, Cluster);
auto [rewriteResults, rewriteIssues] = RewriteExpression(queryExpr, ctx, *TypesCtx, SessionCtx, Cluster);
ctx.IssueManager.AddIssues(rewriteIssues);
if (!rewriteIssues.Empty()) {
return result;
}

result.QueryExprs = rewriteResults;
for (const auto& resultPart : result.QueryExprs) {
YQL_CLOG(INFO, ProviderKqp) << "Splitted Compiled query part:\n" << KqpExprToPrettyString(*resultPart, ctx);
}
Expand All @@ -1280,7 +1285,7 @@ class TKqpHost : public IKqpHost {
settingsBuilder
.SetSqlAutoCommit(false)
.SetUsePgParser(settings.UsePgParser);
auto compileResult = CompileYqlQuery(query, /* isSql */ true, *ExprCtx, sqlVersion, settingsBuilder, settings.PerStatementResult);
auto compileResult = CompileYqlQuery(query, /* isSql */ true, *ExprCtx, sqlVersion, settingsBuilder);

return TSplitResult{
.Ctx = std::move(ExprCtxStorage),
Expand All @@ -1290,7 +1295,7 @@ class TKqpHost : public IKqpHost {
}

TCompileExprResult CompileYqlQuery(const TKqpQueryRef& query, bool isSql, TExprContext& ctx, TMaybe<TSqlVersion>& sqlVersion,
TKqpTranslationSettingsBuilder& settingsBuilder, bool perStatementResult) const
TKqpTranslationSettingsBuilder& settingsBuilder) const
{
auto compileResult = CompileQuery(query, isSql, ctx, sqlVersion, settingsBuilder);
if (!compileResult.QueryExprs) {
Expand All @@ -1302,12 +1307,7 @@ class TKqpHost : public IKqpHost {
}

// Currently used only for create table as
if (!perStatementResult && compileResult.QueryExprs.size() > 1) {
ctx.AddError(YqlIssue(TPosition(), TIssuesIds::KIKIMR_BAD_REQUEST,
"Query can be executed only in per-statement mode (NoTx)"));
compileResult.QueryExprs = {};
return compileResult;
} else if (compileResult.QueryExprs.size() > 1) {
if (compileResult.QueryExprs.size() > 1) {
return compileResult;
}

Expand Down Expand Up @@ -1379,7 +1379,7 @@ class TKqpHost : public IKqpHost {
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(false)
.SetUsePgParser(settings.UsePgParser);
auto compileResult = CompileYqlQuery(query, isSql, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(query, isSql, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -1439,7 +1439,7 @@ class TKqpHost : public IKqpHost {
TMaybe<TSqlVersion> sqlVersion;
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(false);
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -1467,7 +1467,7 @@ class TKqpHost : public IKqpHost {
TMaybe<TSqlVersion> sqlVersion;
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, queryAst.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(false);
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -1513,7 +1513,7 @@ class TKqpHost : public IKqpHost {
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(false)
.SetUsePgParser(settings.UsePgParser);
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder, settings.PerStatementResult);
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -1550,7 +1550,7 @@ class TKqpHost : public IKqpHost {
TMaybe<TSqlVersion> sqlVersion = 1;
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(false);
auto compileResult = CompileYqlQuery(query, true, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(query, true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand All @@ -1571,7 +1571,7 @@ class TKqpHost : public IKqpHost {
TMaybe<TSqlVersion> sqlVersion;
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, queryAst.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(false);
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand All @@ -1598,7 +1598,7 @@ class TKqpHost : public IKqpHost {
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(true)
.SetUsePgParser(settings.UsePgParser);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -1627,7 +1627,7 @@ class TKqpHost : public IKqpHost {
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(true)
.SetUsePgParser(settings.UsePgParser);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand All @@ -1651,7 +1651,7 @@ class TKqpHost : public IKqpHost {
TMaybe<TSqlVersion> sqlVersion;
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(true);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -1679,7 +1679,7 @@ class TKqpHost : public IKqpHost {
TMaybe<TSqlVersion> sqlVersion;
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
settingsBuilder.SetSqlAutoCommit(true);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
if (compileResult.QueryExprs.empty()) {
return nullptr;
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class IKqpHost : public TThrRefBase {
struct TPrepareSettings: public TExecSettings {
TMaybe<bool> IsInternalCall;
TMaybe<bool> ConcurrentResults;
bool PerStatementResult;

TString ToString() const {
return TStringBuilder() << "TPrepareSettings{"
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/kqp/host/kqp_statement_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,25 @@ namespace {
}
}

TVector<NYql::TExprNode::TPtr> RewriteExpression(
std::pair<TVector<NYql::TExprNode::TPtr>, NYql::TIssues> RewriteExpression(
const NYql::TExprNode::TPtr& root,
NYql::TExprContext& exprCtx,
NYql::TTypeAnnotationContext& typeCtx,
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const TString& cluster) {
NYql::TIssues issues;
// CREATE TABLE AS statement can be used only with perstatement execution.
// Thus we assume that there is only one such statement.
ui64 actionsCount = 0;
TVector<NYql::TExprNode::TPtr> result;
VisitExpr(root, [&](const NYql::TExprNode::TPtr& node) {
if (NYql::NNodes::TCoWrite::Match(node.Get())) {
++actionsCount;
const auto rewriteResult = RewriteCreateTableAs(node, exprCtx, typeCtx, sessionCtx, cluster);
if (rewriteResult) {
YQL_ENSURE(result.empty());
if (!result.empty()) {
issues.AddIssue("Several CTAS statement can't be used without per-statement mode.");
}
result.push_back(rewriteResult->CreateTable);
result.push_back(rewriteResult->ReplaceInto);
if (rewriteResult->MoveTable) {
Expand All @@ -316,10 +321,14 @@ TVector<NYql::TExprNode::TPtr> RewriteExpression(
return true;
});

if (!result.empty() && actionsCount > 1) {
issues.AddIssue("CTAS statement can't be used with other statements without per-statement mode.");
}

if (result.empty()) {
result.push_back(root);
}
return result;
return {result, issues};
}

}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_statement_rewrite.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr {
namespace NKqp {

TVector<NYql::TExprNode::TPtr> RewriteExpression(
std::pair<TVector<NYql::TExprNode::TPtr>, NYql::TIssues> RewriteExpression(
const NYql::TExprNode::TPtr& root,
NYql::TExprContext& ctx,
NYql::TTypeAnnotationContext& typeCtx,
Expand Down
16 changes: 12 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,18 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status);

if (QueryState->CompileResult->NeedToSplit) {
YQL_ENSURE(!QueryState->HasTxControl() && QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE);
auto ev = QueryState->BuildSplitRequest(CompilationCookie, GUCSettings);
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId,
QueryState->KqpSessionSpan.GetTraceId());
if (!QueryState->HasTxControl()) {
YQL_ENSURE(QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE);
auto ev = QueryState->BuildSplitRequest(CompilationCookie, GUCSettings);
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId,
QueryState->KqpSessionSpan.GetTraceId());
} else {
NYql::TIssues issues;
ReplyQueryError(
::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST,
"CTAS statement can be executed only in NoTx mode.",
MessageFromIssues(issues));
}
} else {
ReplyQueryCompileError();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!prepareResult.IsSuccess());
UNIT_ASSERT_C(
prepareResult.GetIssues().ToString().Contains("Query can be executed only in per-statement mode (NoTx)"),
prepareResult.GetIssues().ToString().Contains("CTAS statement can be executed only in NoTx mode."),
prepareResult.GetIssues().ToString());
}

Expand Down
81 changes: 81 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2620,6 +2620,87 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(CTASWithoutPerStatement) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
appConfig.MutableTableServiceConfig()->SetEnableAstCache(false);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(false);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting})
.SetWithSampleTables(false)
.SetEnableTempTables(true);

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetQueryClient();

{
auto result = db.ExecuteQuery(R"(
CREATE TABLE Table1 (
PRIMARY KEY (Key)
) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
CREATE TABLE Table2 (
PRIMARY KEY (Key)
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();

UNIT_ASSERT(!result.IsSuccess());
UNIT_ASSERT_C(
result.GetIssues().ToString().Contains("Several CTAS statement can't be used without per-statement mode."),
result.GetIssues().ToString());
}

{
auto result = db.ExecuteQuery(R"(
CREATE TABLE Table2 (
PRIMARY KEY (Key)
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
SELECT * FROM Table1 ORDER BY Key;
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();

UNIT_ASSERT(!result.IsSuccess());
UNIT_ASSERT_C(
result.GetIssues().ToString().Contains("CTAS statement can't be used with other statements without per-statement mode."),
result.GetIssues().ToString());
}

{
auto result = db.ExecuteQuery(R"(
SELECT * FROM Table1 ORDER BY Key;
CREATE TABLE Table2 (
PRIMARY KEY (Key)
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();

UNIT_ASSERT(!result.IsSuccess());
UNIT_ASSERT_C(
result.GetIssues().ToString().Contains("CTAS statement can't be used with other statements without per-statement mode."),
result.GetIssues().ToString());
}

{
auto result = db.ExecuteQuery(R"(
CREATE TABLE Table1 (
PRIMARY KEY (Key)
) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = db.ExecuteQuery(R"(
SELECT * FROM Table1 ORDER BY Key;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(SeveralCTAS) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
Loading