diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 24f7108c3dc..99499eecd32 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -132,6 +132,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request) case tipb::ExecType::TypeProjection: case tipb::ExecType::TypeSelection: case tipb::ExecType::TypeLimit: + case tipb::ExecType::TypeTopN: // Only support mock table_scan/exchange_sender/exchange_receiver in test mode now. case tipb::ExecType::TypeTableScan: case tipb::ExecType::TypeExchangeSender: diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp b/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp index b92b68cc975..eddcc5af425 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp @@ -17,10 +17,13 @@ #include #include #include +#include #include #include #include #include +#include +#include namespace DB { @@ -65,6 +68,19 @@ void PhysicalTopN::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & c orderStreams(pipeline, max_streams, order_descr, limit, false, context, log); } +void PhysicalTopN::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) +{ + if (!before_sort_actions->getActions().empty()) + { + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(group_builder.exec_status, before_sort_actions, log->identifier())); + }); + } + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(group_builder.exec_status, order_descr, limit, context.getSettingsRef().max_block_size, log->identifier())); + }); +} + void PhysicalTopN::finalize(const Names & parent_require) { Names required_output = parent_require; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTopN.h b/dbms/src/Flash/Planner/Plans/PhysicalTopN.h index 1de947b50f9..af1482cea7d 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTopN.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalTopN.h @@ -49,6 +49,8 @@ class PhysicalTopN : public PhysicalUnary const Block & getSampleBlock() const override; + void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t concurrency) override; + private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out index 82700ed7b47..4dc0bf5cef1 100644 --- a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out +++ b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out @@ -11,21 +11,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Limit|limit_2 -> Limi ~test_suite_name: StrangeQuery ~result_index: 2 ~result: -Union: - Expression x 10: - SharedQuery: - MergeSorting, limit = 8 - Union: - PartialSorting x 10: limit = 8 - SharedQuery: - MergeSorting, limit = 9 - Union: - PartialSorting x 10: limit = 9 - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockTableScan +pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> TopN|topn_2 -> TopN|topn_3 -> Projection|NonTiDBOperator @ ~test_suite_name: SingleQueryBlock ~result_index: 0 @@ -103,21 +89,12 @@ Union: ~test_suite_name: ParallelQuery ~result_index: 6 ~result: -Expression: - MergeSorting, limit = 10 - PartialSorting: limit = 10 - MockTableScan +pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 7 ~result: -Union: - Expression x 5: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 5: limit = 10 - MockTableScan +pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 8 @@ -224,23 +201,12 @@ MockExchangeSender ~test_suite_name: ParallelQuery ~result_index: 18 ~result: -Union: - MockExchangeSender x 10 - Expression: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockTableScan +pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 @ ~test_suite_name: ParallelQuery ~result_index: 19 ~result: -MockExchangeSender - Expression: - MergeSorting, limit = 10 - PartialSorting: limit = 10 - MockTableScan +pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 @ ~test_suite_name: ParallelQuery ~result_index: 20 @@ -280,15 +246,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|pro ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 1 ~result: -Union: - Expression x 10: - Expression: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - MockTableScan +pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> Projection|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 2 @@ -393,13 +351,7 @@ Union: ~test_suite_name: FineGrainedShuffle ~result_index: 1 ~result: -Union: - Expression x 10: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockExchangeReceiver +pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffle ~result_index: 2 @@ -417,13 +369,7 @@ Union: ~test_suite_name: FineGrainedShuffle ~result_index: 3 ~result: -Union: - Expression x 10: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockExchangeReceiver +pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffleJoin ~result_index: 0 diff --git a/dbms/src/Flash/tests/gtest_topn_executor.cpp b/dbms/src/Flash/tests/gtest_topn_executor.cpp index 88b8b40f6fc..b77e62b6aad 100644 --- a/dbms/src/Flash/tests/gtest_topn_executor.cpp +++ b/dbms/src/Flash/tests/gtest_topn_executor.cpp @@ -40,6 +40,15 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest toNullableVec(col_name[1], col_gender), toNullableVec(col_name[2], col_country), toNullableVec(col_name[3], col_salary)}); + context.addMockTable({db_name, empty_name}, + {{col_name[0], TiDB::TP::TypeLong}, + {col_name[1], TiDB::TP::TypeString}, + {col_name[2], TiDB::TP::TypeString}, + {col_name[3], TiDB::TP::TypeLong}}, + {toNullableVec(col_name[0], {}), + toNullableVec(col_name[1], {}), + toNullableVec(col_name[2], {}), + toNullableVec(col_name[3], {})}); /// table with 200 rows { @@ -79,7 +88,7 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest return context.scan(db_name, table_name).topN(col_name, is_desc, limit_num).build(context); } - std::shared_ptr buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockColumnNameVec out_proj_ast = {}) + std::shared_ptr buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockAstVec out_proj_ast = {}) { if (func_proj_ast.empty()) return context.scan(db_name, table_name).topN(order_by_items, limit).build(context); @@ -100,6 +109,9 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest ColumnWithNullableString col_gender{"female", "female", "male", "female", "male", "male"}; ColumnWithNullableString col_country{"korea", "usa", "usa", "china", "china", "china"}; ColumnWithNullableInt32 col_salary{1300, 0, {}, 900, {}, -300}; + + // empty table + const String empty_name{"empty_table"}; }; TEST_F(TopNExecutorTestRunner, TopN) @@ -113,43 +125,18 @@ try size_t col_data_num = col0.size(); for (size_t i = 1; i <= 1; ++i) { - bool is_desc; - is_desc = static_cast(i); /// Set descent or ascent - if (is_desc) - sort(col0.begin(), col0.end(), std::greater()); /// Sort col0 for the following comparison - else - sort(col0.begin(), col0.end()); - + bool is_desc = static_cast(i); /// Set descent or ascent for (size_t limit_num = 0; limit_num <= col_data_num + 5; ++limit_num) { request = buildDAGRequest(table_single_name, single_col_name, is_desc, limit_num); - - expect_cols.clear(); - if (limit_num == 0 || limit_num > col_data_num) - expect_cols.push_back({toNullableVec(single_col_name, ColumnWithNullableString(col0.begin(), col0.end()))}); - else - expect_cols.push_back({toNullableVec(single_col_name, ColumnWithNullableString(col0.begin(), col0.begin() + limit_num))}); - - executeAndAssertColumnsEqual(request, expect_cols.back()); + SortInfos sort_infos{{0, is_desc}}; + executeAndAssertSortedBlocks(request, sort_infos); } } } { /// Test multi-columns - expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{36, 34, 32, 27, {}, {}}), - toNullableVec(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "male", "female"}), - toNullableVec(col_name[2], ColumnWithNullableString{"china", "china", "usa", "usa", "china", "korea"}), - toNullableVec(col_name[3], ColumnWithNullableInt32{900, -300, {}, 0, {}, 1300})}, - {toNullableVec(col_name[0], ColumnWithNullableInt32{32, {}, 34, 27, 36, {}}), - toNullableVec(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}), - toNullableVec(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "china", "korea"}), - toNullableVec(col_name[3], ColumnWithNullableInt32{{}, {}, -300, 0, 900, 1300})}, - {toNullableVec(col_name[0], ColumnWithNullableInt32{34, {}, 32, 36, {}, 27}), - toNullableVec(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}), - toNullableVec(col_name[2], ColumnWithNullableString{"china", "china", "usa", "china", "korea", "usa"}), - toNullableVec(col_name[3], ColumnWithNullableInt32{-300, {}, {}, 900, 1300, 0})}}; - std::vector order_by_items{ /// select * from clerk order by age DESC, gender DESC; {MockOrderByItem(col_name[0], true), MockOrderByItem(col_name[1], true)}, @@ -158,12 +145,15 @@ try /// select * from clerk order by gender DESC, country ASC, salary DESC; {MockOrderByItem(col_name[1], true), MockOrderByItem(col_name[2], false), MockOrderByItem(col_name[3], true)}}; - size_t test_num = expect_cols.size(); + std::vector infos{ + {{0, true}, {1, true}}, + {{1, true}, {3, false}}, + {{1, true}, {2, false}, {3, true}}}; - for (size_t i = 0; i < test_num; ++i) + for (size_t i = 0; i < order_by_items.size(); ++i) { request = buildDAGRequest(table_name, order_by_items[i], 100); - executeAndAssertColumnsEqual(request, expect_cols[i]); + executeAndAssertSortedBlocks(request, infos[i]); } } } @@ -173,8 +163,11 @@ TEST_F(TopNExecutorTestRunner, TopNFunction) try { std::shared_ptr request; - std::vector expect_cols; - MockColumnNameVec output_projection{col_name[0], col_name[1], col_name[2], col_name[3]}; + std::vector output_projections{ + {col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), And(col("age"), col("salary"))}, + {col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), eq(col("age"), col("salary"))}, + {col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), gt(col("age"), col("salary"))}}; + MockAstVec func_projection; // Do function operation for topn MockOrderByItemVec order_by_items; ASTPtr col0_ast = col(col_name[0]); @@ -183,57 +176,41 @@ try ASTPtr col3_ast = col(col_name[3]); ASTPtr func_ast; + /// "and" function { - /// "and" function - expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{{}, {}, 32, 27, 36, 34}), - toNullableVec(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}), - toNullableVec(col_name[2], ColumnWithNullableString{"korea", "china", "usa", "usa", "china", "china"}), - toNullableVec(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 0, 900, -300})}}; - - { - /// select * from clerk order by age and salary ASC limit 100; - order_by_items = {MockOrderByItem("and(age, salary)", false)}; - func_ast = And(col(col_name[0]), col(col_name[3])); - func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast}; - - request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection); - executeAndAssertColumnsEqual(request, expect_cols.back()); - } + /// select * from clerk order by age and salary ASC limit 100; + order_by_items = {MockOrderByItem("and(age, salary)", false)}; + func_ast = And(col(col_name[0]), col(col_name[3])); + func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast}; + request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[0]); + SortInfos sort_infos{{4, false}}; + executeAndAssertSortedBlocks(request, sort_infos); } - { - /// "equal" function - expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{27, 36, 34, 32, {}, {}}), - toNullableVec(col_name[1], ColumnWithNullableString{"female", "female", "male", "male", "female", "male"}), - toNullableVec(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "korea", "china"}), - toNullableVec(col_name[3], ColumnWithNullableInt32{0, 900, -300, {}, 1300, {}})}}; - - { - /// select age, salary from clerk order by age = salary DESC limit 100; - order_by_items = {MockOrderByItem("equals(age, salary)", true)}; - func_ast = eq(col(col_name[0]), col(col_name[3])); - func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast}; - request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection); - executeAndAssertColumnsEqual(request, expect_cols.back()); - } + /// "equal" function + { + /// select age, salary from clerk order by age = salary DESC limit 100; + order_by_items = {MockOrderByItem("equals(age, salary)", true)}; + func_ast = eq(col(col_name[0]), col(col_name[3])); + func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast}; + + request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[1]); + SortInfos sort_infos{{4, true}}; + executeAndAssertSortedBlocks(request, sort_infos); } { /// "greater" function - expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{{}, 32, {}, 36, 27, 34}), - toNullableVec(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}), - toNullableVec(col_name[2], ColumnWithNullableString{"korea", "usa", "china", "china", "usa", "china"}), - toNullableVec(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 900, 0, -300})}}; - { /// select age, gender, country, salary from clerk order by age > salary ASC limit 100; order_by_items = {MockOrderByItem("greater(age, salary)", false)}; func_ast = gt(col(col_name[0]), col(col_name[3])); func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast}; - request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection); - executeAndAssertColumnsEqual(request, expect_cols.back()); + request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[2]); + SortInfos sort_infos{{4, false}}; + executeAndAssertSortedBlocks(request, sort_infos); } } @@ -251,15 +228,30 @@ try for (auto limit_num : limits) { auto request = context - .scan("test_db", table) + .scan(db_name, table) .topN("key", false, limit_num) .build(context); - auto expect = executeStreams(request, 1); - executeAndAssertColumnsEqual(request, expect); + SortInfos sort_infos{{0, false}}; + executeAndAssertSortedBlocks(request, sort_infos); } } } CATCH +TEST_F(TopNExecutorTestRunner, Empty) +try +{ + for (size_t i = 0; i < col_name.size(); ++i) + { + auto request = context + .scan(db_name, empty_name) + .topN(col_name[i], false, 100) + .build(context); + SortInfos sort_infos{{i, false}}; + executeAndAssertSortedBlocks(request, sort_infos); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Operators/TopNTransformOp.cpp b/dbms/src/Operators/TopNTransformOp.cpp new file mode 100644 index 00000000000..627feb73537 --- /dev/null +++ b/dbms/src/Operators/TopNTransformOp.cpp @@ -0,0 +1,58 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +OperatorStatus TopNTransformOp::transformImpl(Block & block) +{ + if (unlikely(!block)) + { + if (!blocks.empty()) + { + impl = std::make_unique( + blocks, + order_desc, + req_id, + max_block_size, + limit); + block = impl->read(); + } + return OperatorStatus::HAS_OUTPUT; + } + RUNTIME_CHECK_MSG(!impl, "Impl must be nullptr here."); + + sortBlock(block, order_desc, limit); + blocks.emplace_back(std::move(block)); + return OperatorStatus::NEED_INPUT; +} + +OperatorStatus TopNTransformOp::tryOutputImpl(Block & block) +{ + if (impl) + { + block = impl->read(); + return OperatorStatus::HAS_OUTPUT; + } + return OperatorStatus::NEED_INPUT; +} + +void TopNTransformOp::transformHeaderImpl(Block &) +{ +} + +} // namespace DB diff --git a/dbms/src/Operators/TopNTransformOp.h b/dbms/src/Operators/TopNTransformOp.h new file mode 100644 index 00000000000..e0ae9857c2f --- /dev/null +++ b/dbms/src/Operators/TopNTransformOp.h @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class TopNTransformOp : public TransformOp +{ +public: + TopNTransformOp( + PipelineExecutorStatus & exec_status_, + const SortDescription & order_desc_, + size_t limit_, + size_t max_block_size_, + const String & req_id_) + : TransformOp(exec_status_) + , log(Logger::get(req_id_)) + , order_desc(order_desc_) + , limit(limit_) + , max_block_size(max_block_size_) + , req_id(req_id_) + {} + + String getName() const override + { + return "TopNTransformOp"; + } + + OperatorStatus transformImpl(Block & block) override; + OperatorStatus tryOutputImpl(Block & block) override; + + void transformHeaderImpl(Block & header_) override; + + +private: + const LoggerPtr log; + SortDescription order_desc; + size_t limit; + size_t max_block_size; + String req_id; + Blocks blocks; + std::unique_ptr impl; +}; +} // namespace DB diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 650e5d7b543..2810bb033dc 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -132,6 +132,35 @@ void ExecutorTest::executeInterpreterWithDeltaMerge(const String & expected_stri ASSERT_EQ(Poco::trim(expected_string), Poco::trim(query_executor->toString())); } + +namespace +{ +String testInfoMsg(const std::shared_ptr & request, bool enable_planner, bool enable_pipeline, size_t concurrency, size_t block_size) +{ + const auto & test_info = testing::UnitTest::GetInstance()->current_test_info(); + assert(test_info); + return fmt::format( + "test info:\n" + " file: {}\n" + " line: {}\n" + " test_case_name: {}\n" + " test_func_name: {}\n" + " enable_planner: {}\n" + " enable_pipeline: {}\n" + " concurrency: {}\n" + " block_size: {}\n" + " dag_request: \n{}", + test_info->file(), + test_info->line(), + test_info->test_case_name(), + test_info->name(), + enable_planner, + enable_pipeline, + concurrency, + block_size, + ExecutorSerializer().serialize(request.get())); +} +} // namespace void ExecutorTest::executeExecutor( const std::shared_ptr & request, std::function<::testing::AssertionResult(const ColumnsWithTypeAndName &)> assert_func) @@ -145,41 +174,61 @@ void ExecutorTest::executeExecutor( { context.context.setSetting("max_block_size", Field(static_cast(block_size))); auto res = executeStreams(request, concurrency); - auto test_info_msg = [&]() { - const auto & test_info = testing::UnitTest::GetInstance()->current_test_info(); - assert(test_info); - return fmt::format( - "test info:\n" - " file: {}\n" - " line: {}\n" - " test_case_name: {}\n" - " test_func_name: {}\n" - " enable_planner: {}\n" - " concurrency: {}\n" - " block_size: {}\n" - " dag_request: \n{}" - " result_block: \n{}", - test_info->file(), - test_info->line(), - test_info->test_case_name(), - test_info->name(), - enable_planner, - concurrency, - block_size, - ExecutorSerializer().serialize(request.get()), - getColumnsContent(res)); - }; - ASSERT_TRUE(assert_func(res)) << test_info_msg(); + ASSERT_TRUE(assert_func(res)) << testInfoMsg(request, enable_planner, enable_pipeline, concurrency, block_size); } } WRAP_FOR_TEST_END } +void ExecutorTest::checkBlockSorted( + const std::shared_ptr & request, + const SortInfos & sort_infos, + std::function<::testing::AssertionResult(const ColumnsWithTypeAndName &, const ColumnsWithTypeAndName &)> assert_func) +{ + WRAP_FOR_TEST_BEGIN + std::vector concurrencies{2, 5, 10}; + for (auto concurrency : concurrencies) + { + auto expected_res = executeStreams(request, concurrency); + std::vector block_sizes{1, 2, DEFAULT_BLOCK_SIZE}; + for (auto block_size : block_sizes) + { + context.context.setSetting("max_block_size", Field(static_cast(block_size))); + auto return_blocks = getExecuteStreamsReturnBlocks(request, concurrency); + if (!return_blocks.empty()) + { + SortDescription sort_desc; + for (auto sort_info : sort_infos) + sort_desc.emplace_back(return_blocks.back().getColumnsWithTypeAndName()[sort_info.column_index].name, sort_info.desc ? -1 : 1, -1); + + for (auto & block : return_blocks) + ASSERT_TRUE(isAlreadySorted(block, sort_desc)) << testInfoMsg(request, enable_planner, enable_pipeline, concurrency, block_size); + + auto res = vstackBlocks(std::move(return_blocks)).getColumnsWithTypeAndName(); + ASSERT_TRUE(assert_func(expected_res, res)) << testInfoMsg(request, enable_planner, enable_pipeline, concurrency, block_size); + } + }; + WRAP_FOR_TEST_END + } +} + void ExecutorTest::executeAndAssertColumnsEqual(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns) { executeExecutor(request, [&](const ColumnsWithTypeAndName & res) { return columnsEqual(expect_columns, res, /*_restrict=*/false) << "\n expect_block: \n" - << getColumnsContent(expect_columns); + << getColumnsContent(expect_columns) + << "\n actual_block: \n" + << getColumnsContent(res); + }); +} + +void ExecutorTest::executeAndAssertSortedBlocks(const std::shared_ptr & request, const SortInfos & sort_infos) +{ + checkBlockSorted(request, sort_infos, [&](const ColumnsWithTypeAndName & expect_columns, const ColumnsWithTypeAndName & res) { + return columnsEqual(expect_columns, res, /*_restrict=*/false) << "\n expect_block: \n" + << getColumnsContent(expect_columns) + << "\n actual_block: \n" + << getColumnsContent(res); }); } diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h index 65f7bec3b03..13363768a3b 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -78,6 +78,17 @@ class ExecutorTest : public ::testing::Test ColumnsWithTypeAndName executeRawQuery(const String & query, size_t concurrency = 1); void executeAndAssertColumnsEqual(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns); + + // To check the output column with index = column_index sorted. + struct SortInfo + { + size_t column_index; + bool desc; + }; + using SortInfos = std::vector; + + // check whether the column in each output block sorted. + void executeAndAssertSortedBlocks(const std::shared_ptr & request, const SortInfos & sort_infos); void executeAndAssertRowsEqual(const std::shared_ptr & request, size_t expect_rows); enum SourceType @@ -119,6 +130,11 @@ class ExecutorTest : public ::testing::Test const std::shared_ptr & request, std::function<::testing::AssertionResult(const ColumnsWithTypeAndName &)> assert_func); + void checkBlockSorted( + const std::shared_ptr & request, + const SortInfos & sort_infos, + std::function<::testing::AssertionResult(const ColumnsWithTypeAndName &, const ColumnsWithTypeAndName &)> assert_func); + protected: MockDAGRequestContext context; std::unique_ptr dag_context_ptr;