Skip to content

Blocks for first stages #1404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ bool NeedReportStats(const Ydb::Query::ExecuteQueryRequest& req) {
}
}

bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
switch (req.exec_mode()) {
case Ydb::Query::EXEC_MODE_EXPLAIN:
return true;

case Ydb::Query::EXEC_MODE_EXECUTE:
switch (req.stats_mode()) {
case Ydb::Query::StatsMode::STATS_MODE_FULL:
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
return true;
default:
return false;
}

default:
return false;
}
}

class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -382,6 +401,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

if (hasTrailingMessage) {
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class TKqpRunner : public IKqpRunner {
: Gateway(gateway)
, Cluster(cluster)
, TypesCtx(*typesCtx)
, SessionCtx(sessionCtx)
, FunctionRegistry(funcRegistry)
, Config(sessionCtx->ConfigPtr())
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
Expand Down Expand Up @@ -192,14 +194,19 @@ class TKqpRunner : public IKqpRunner {
YQL_ENSURE(IsIn({EKikimrQueryType::Query, EKikimrQueryType::Script}, TransformCtx->QueryCtx->Type));
YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query));

TypesCtx.BlockEngineMode = NYql::EBlockEngineMode::Auto;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to explicitly disable dq.UseWideChannels and dq.UseWideBlockChannels, since BlockEngineMode=auto will enable WideChannels and WideBlocksChannels unless they are disabled explicitly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already set scalar channels only mode here

EChannelMode mode = EChannelMode::CHANNEL_SCALAR;

(similar to

NDq::EChannelMode mode = GetConfiguredChannelMode(State_, typesCtx);
)


return PrepareQueryInternal(cluster, TKiDataQueryBlocks(query), ctx, settings);
}

private:

TIntrusivePtr<TAsyncQueryResult> PrepareQueryInternal(const TString& cluster,
const TKiDataQueryBlocks& dataQueryBlocks, TExprContext& ctx,
const IKikimrQueryExecutor::TExecuteSettings& settings)
{
CreateGraphTransformer(&TypesCtx, SessionCtx, FunctionRegistry);

YQL_ENSURE(cluster == Cluster);
YQL_ENSURE(!settings.CommitTx);
YQL_ENSURE(!settings.RollbackTx);
Expand Down Expand Up @@ -311,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
TAutoPtr<IGraphTransformer> compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster,
*TransformCtx,
*OptimizeCtx,
TypesCtx,
*typesCtx,
funcRegistry,
Config));

Expand Down Expand Up @@ -349,6 +356,8 @@ class TKqpRunner : public IKqpRunner {
TIntrusivePtr<IKqpGateway> Gateway;
TString Cluster;
TTypeAnnotationContext& TypesCtx;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
const NMiniKQL::IFunctionRegistry& FunctionRegistry;
TKikimrConfiguration::TPtr Config;

TIntrusivePtr<TKqlTransformContext> TransformCtx;
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,16 @@ static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollec
}
}

static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& /*streamPart*/, TCollectedStreamResult& /*res*/) {}
static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& streamPart, TCollectedStreamResult& res) {
if (streamPart.GetStats() ) {
res.QueryStats = NYdb::TProtoAccessor::GetProto(*streamPart.GetStats());

auto plan = res.QueryStats->query_plan();
if (!plan.empty()) {
res.PlanJson = plan;
}
}
}

template<typename TIterator>
TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
Expand Down
153 changes: 123 additions & 30 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,27 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
};
}

void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes,
template <typename TClient>
auto StreamExplainQuery(const TString& query, TClient& client) {
if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) {
TStreamExecScanQuerySettings scanSettings;
scanSettings.Explain(true);
return client.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
} else {
NYdb::NQuery::TExecuteQuerySettings scanSettings;
scanSettings.ExecMode(NYdb::NQuery::EExecMode::Explain);
return client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), scanSettings).GetValueSync();
}
}

template <typename TClient>
void CheckPlanForAggregatePushdown(
const TString& query,
TClient& client,
const std::vector<std::string>& expectedPlanNodes,
const std::string& readNodeType)
{
TStreamExecScanQuerySettings scanSettings;
scanSettings.Explain(true);
auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
auto res = StreamExplainQuery(query, client);
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());

auto planRes = CollectStreamResult(res);
Expand All @@ -704,7 +719,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Cerr << planRes.PlanJson.GetOrElse("NO_PLAN") << Endl;
Cerr << "AST:" << Endl;
Cerr << ast << Endl;
for (auto planNode : planNodes) {
for (auto planNode : expectedPlanNodes) {
UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
TStringBuilder() << planNode << " was not found. Query: " << query);
}
Expand Down Expand Up @@ -2437,10 +2452,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ExpectedReply = value;
return *this;
}

TAggregationTestCase& AddExpectedPlanOptions(const std::string& value) {
ExpectedPlanOptions.emplace_back(value);
return *this;
}

const std::vector<std::string>& GetExpectedPlanOptions() const {
return ExpectedPlanOptions;
}
Expand Down Expand Up @@ -2548,7 +2565,28 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestAggregationsInternal(cases);
}

void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases) {
template <typename TClient>
auto StreamExecuteQuery(const TAggregationTestCase& testCase, TClient& client) {
if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) {
return client.StreamExecuteScanQuery(testCase.GetFixedQuery()).GetValueSync();
} else {
return client.StreamExecuteQuery(
testCase.GetFixedQuery(),
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
}
}

template <typename TClient>
void RunTestCaseWithClient(const TAggregationTestCase& testCase, TClient& client) {
auto it = StreamExecuteQuery(testCase, client);
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
if (!testCase.GetExpectedReply().empty()) {
CompareYson(result, testCase.GetExpectedReply());
}
}

void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases, const bool genericQuery) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetForceColumnTablesCompositeMarks(true);
Expand All @@ -2564,17 +2602,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestDataForClickBench(kikimr, "/Root/benchTable", 0, 1000000 + i * 1000000, iterationPackSize);
}

for (auto&& i : cases) {
const TString queryFixed = i.GetFixedQuery();
{
auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
if (!i.GetExpectedReply().empty()) {
CompareYson(result, i.GetExpectedReply());
}
if (!genericQuery) {
auto tableClient = kikimr.GetTableClient();
for (auto&& i : cases) {
const TString queryFixed = i.GetFixedQuery();
RunTestCaseWithClient(i, tableClient);
CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
} else {
auto queryClient = kikimr.GetQueryClient();
for (auto&& i : cases) {
const TString queryFixed = i.GetFixedQuery();
RunTestCaseWithClient(i, queryClient);
CheckPlanForAggregatePushdown(queryFixed, queryClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
}

Expand Down Expand Up @@ -2637,12 +2678,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}

void TestClickBench(const std::vector<TAggregationTestCase>& cases) {
TestClickBenchBase(cases);
TestClickBenchInternal(cases);
void TestClickBench(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) {
TestClickBenchBase(cases, genericQuery);
if (!genericQuery) {
TestClickBenchInternal(cases);
}
}

void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases) {
void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetForceColumnTablesCompositeMarks(true);
Expand All @@ -2656,17 +2699,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
}

for (auto&& i : cases) {
const TString queryFixed = i.GetFixedQuery();
{
auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
if (!i.GetExpectedReply().empty()) {
CompareYson(result, i.GetExpectedReply());
}
if (!genericQuery) {
auto tableClient = kikimr.GetTableClient();
for (auto&& i : cases) {
RunTestCaseWithClient(i, tableClient);
CheckPlanForAggregatePushdown(i.GetFixedQuery(), tableClient,
i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
} else {
auto queryClient = kikimr.GetQueryClient();
for (auto&& i : cases) {
RunTestCaseWithClient(i, queryClient);
CheckPlanForAggregatePushdown(i.GetFixedQuery(), queryClient,
i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
}

Expand Down Expand Up @@ -5883,6 +5929,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(output, R"([])");
}
}

Y_UNIT_TEST(BlockGenericWithDistinct) {
TAggregationTestCase testCase;
testCase.SetQuery(R"(
SELECT
COUNT(DISTINCT id)
FROM `/Root/tableWithNulls`
WHERE level = 5 AND Cast(id AS String) = "5";
)")
.AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
.AddExpectedPlanOptions("WideFromBlocks")
.SetExpectedReply("[[1u]]");
TestTableWithNulls({ testCase }, /* generic */ true);
}

Y_UNIT_TEST(BlockGenericSimpleAggregation) {
TAggregationTestCase testCase;
testCase.SetQuery(R"(
SELECT
level, COUNT(*), SUM(id)
FROM `/Root/tableWithNulls`
WHERE level = 5
GROUP BY level
ORDER BY level;
)")
.AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
.AddExpectedPlanOptions("WideFromBlocks")
.SetExpectedReply(R"([[[5];1u;5]])");

TestTableWithNulls({ testCase }, /* generic */ true);
}

Y_UNIT_TEST(BlockGenericSelectAll) {
TAggregationTestCase testCase;
testCase.SetQuery(R"(
SELECT
id, resource_id, level
FROM `/Root/tableWithNulls`
WHERE level != 5 OR level IS NULL
ORDER BY id, resource_id, level;
)")
.AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
.AddExpectedPlanOptions("WideFromBlocks")
.SetExpectedReply(R"([[1;#;[1]];[2;#;[2]];[3;#;[3]];[4;#;[4]];[6;["6"];#];[7;["7"];#];[8;["8"];#];[9;["9"];#];[10;["10"];#]])");

TestTableWithNulls({ testCase }, /* generic */ true);
}
}

} // namespace NKqp
Expand Down