Skip to content

Commit

Permalink
Pipeline: support topn (#6805)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
ywqzzy authored Feb 15, 2023
1 parent 7042c7b commit 64ae0c3
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 162 deletions.
1 change: 1 addition & 0 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
// Only support mock table_scan/exchange_sender/exchange_receiver in test mode now.
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypeExchangeSender:
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/FinalizeHelper.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Flash/Planner/Plans/PhysicalTopN.h>
#include <Interpreters/Context.h>
#include <Operators/ExpressionTransformOp.h>
#include <Operators/TopNTransformOp.h>

namespace DB
{
Expand Down Expand Up @@ -65,6 +68,19 @@ void PhysicalTopN::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & c
orderStreams(pipeline, max_streams, order_descr, limit, false, context, log);
}

void PhysicalTopN::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/)
{
if (!before_sort_actions->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(group_builder.exec_status, before_sort_actions, log->identifier()));
});
}
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<TopNTransformOp>(group_builder.exec_status, order_descr, limit, context.getSettingsRef().max_block_size, log->identifier()));
});
}

void PhysicalTopN::finalize(const Names & parent_require)
{
Names required_output = parent_require;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalTopN.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class PhysicalTopN : public PhysicalUnary

const Block & getSampleBlock() const override;

void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t concurrency) override;

private:
void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override;

Expand Down
70 changes: 8 additions & 62 deletions dbms/src/Flash/tests/gtest_pipeline_interpreter.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Limit|limit_2 -> Limi
~test_suite_name: StrangeQuery
~result_index: 2
~result:
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 8
Union: <for partial order>
PartialSorting x 10: limit = 8
SharedQuery: <restore concurrency>
MergeSorting, limit = 9
Union: <for partial order>
PartialSorting x 10: limit = 9
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> TopN|topn_2 -> TopN|topn_3 -> Projection|NonTiDBOperator
@
~test_suite_name: SingleQueryBlock
~result_index: 0
Expand Down Expand Up @@ -103,21 +89,12 @@ Union: <for test>
~test_suite_name: ParallelQuery
~result_index: 6
~result:
Expression: <final projection>
MergeSorting, limit = 10
PartialSorting: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: ParallelQuery
~result_index: 7
~result:
Union: <for test>
Expression x 5: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 5: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: ParallelQuery
~result_index: 8
Expand Down Expand Up @@ -224,23 +201,12 @@ MockExchangeSender
~test_suite_name: ParallelQuery
~result_index: 18
~result:
Union: <for test>
MockExchangeSender x 10
Expression: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
@
~test_suite_name: ParallelQuery
~result_index: 19
~result:
MockExchangeSender
Expression: <final projection>
MergeSorting, limit = 10
PartialSorting: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
@
~test_suite_name: ParallelQuery
~result_index: 20
Expand Down Expand Up @@ -280,15 +246,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|pro
~test_suite_name: MultipleQueryBlockWithSource
~result_index: 1
~result:
Union: <for test>
Expression x 10: <final projection>
Expression: <projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
Expression: <projection>
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> Projection|NonTiDBOperator
@
~test_suite_name: MultipleQueryBlockWithSource
~result_index: 2
Expand Down Expand Up @@ -393,13 +351,7 @@ Union: <for test>
~test_suite_name: FineGrainedShuffle
~result_index: 1
~result:
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockExchangeReceiver
pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: FineGrainedShuffle
~result_index: 2
Expand All @@ -417,13 +369,7 @@ Union: <for test>
~test_suite_name: FineGrainedShuffle
~result_index: 3
~result:
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockExchangeReceiver
pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: FineGrainedShuffleJoin
~result_index: 0
Expand Down
140 changes: 66 additions & 74 deletions dbms/src/Flash/tests/gtest_topn_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest
toNullableVec<String>(col_name[1], col_gender),
toNullableVec<String>(col_name[2], col_country),
toNullableVec<Int32>(col_name[3], col_salary)});
context.addMockTable({db_name, empty_name},
{{col_name[0], TiDB::TP::TypeLong},
{col_name[1], TiDB::TP::TypeString},
{col_name[2], TiDB::TP::TypeString},
{col_name[3], TiDB::TP::TypeLong}},
{toNullableVec<Int32>(col_name[0], {}),
toNullableVec<String>(col_name[1], {}),
toNullableVec<String>(col_name[2], {}),
toNullableVec<Int32>(col_name[3], {})});

/// table with 200 rows
{
Expand Down Expand Up @@ -79,7 +88,7 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest
return context.scan(db_name, table_name).topN(col_name, is_desc, limit_num).build(context);
}

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockColumnNameVec out_proj_ast = {})
std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockAstVec out_proj_ast = {})
{
if (func_proj_ast.empty())
return context.scan(db_name, table_name).topN(order_by_items, limit).build(context);
Expand All @@ -100,6 +109,9 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest
ColumnWithNullableString col_gender{"female", "female", "male", "female", "male", "male"};
ColumnWithNullableString col_country{"korea", "usa", "usa", "china", "china", "china"};
ColumnWithNullableInt32 col_salary{1300, 0, {}, 900, {}, -300};

// empty table
const String empty_name{"empty_table"};
};

TEST_F(TopNExecutorTestRunner, TopN)
Expand All @@ -113,43 +125,18 @@ try
size_t col_data_num = col0.size();
for (size_t i = 1; i <= 1; ++i)
{
bool is_desc;
is_desc = static_cast<bool>(i); /// Set descent or ascent
if (is_desc)
sort(col0.begin(), col0.end(), std::greater<ColStringNullableType>()); /// Sort col0 for the following comparison
else
sort(col0.begin(), col0.end());

bool is_desc = static_cast<bool>(i); /// Set descent or ascent
for (size_t limit_num = 0; limit_num <= col_data_num + 5; ++limit_num)
{
request = buildDAGRequest(table_single_name, single_col_name, is_desc, limit_num);

expect_cols.clear();
if (limit_num == 0 || limit_num > col_data_num)
expect_cols.push_back({toNullableVec<String>(single_col_name, ColumnWithNullableString(col0.begin(), col0.end()))});
else
expect_cols.push_back({toNullableVec<String>(single_col_name, ColumnWithNullableString(col0.begin(), col0.begin() + limit_num))});

executeAndAssertColumnsEqual(request, expect_cols.back());
SortInfos sort_infos{{0, is_desc}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}
}

{
/// Test multi-columns
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{36, 34, 32, 27, {}, {}}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "male", "female"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"china", "china", "usa", "usa", "china", "korea"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{900, -300, {}, 0, {}, 1300})},
{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{32, {}, 34, 27, 36, {}}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "china", "korea"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{{}, {}, -300, 0, 900, 1300})},
{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{34, {}, 32, 36, {}, 27}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"china", "china", "usa", "china", "korea", "usa"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{-300, {}, {}, 900, 1300, 0})}};

std::vector<MockOrderByItemVec> order_by_items{
/// select * from clerk order by age DESC, gender DESC;
{MockOrderByItem(col_name[0], true), MockOrderByItem(col_name[1], true)},
Expand All @@ -158,12 +145,15 @@ try
/// select * from clerk order by gender DESC, country ASC, salary DESC;
{MockOrderByItem(col_name[1], true), MockOrderByItem(col_name[2], false), MockOrderByItem(col_name[3], true)}};

size_t test_num = expect_cols.size();
std::vector<SortInfos> infos{
{{0, true}, {1, true}},
{{1, true}, {3, false}},
{{1, true}, {2, false}, {3, true}}};

for (size_t i = 0; i < test_num; ++i)
for (size_t i = 0; i < order_by_items.size(); ++i)
{
request = buildDAGRequest(table_name, order_by_items[i], 100);
executeAndAssertColumnsEqual(request, expect_cols[i]);
executeAndAssertSortedBlocks(request, infos[i]);
}
}
}
Expand All @@ -173,8 +163,11 @@ TEST_F(TopNExecutorTestRunner, TopNFunction)
try
{
std::shared_ptr<tipb::DAGRequest> request;
std::vector<ColumnsWithTypeAndName> expect_cols;
MockColumnNameVec output_projection{col_name[0], col_name[1], col_name[2], col_name[3]};
std::vector<MockAstVec> output_projections{
{col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), And(col("age"), col("salary"))},
{col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), eq(col("age"), col("salary"))},
{col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), gt(col("age"), col("salary"))}};

MockAstVec func_projection; // Do function operation for topn
MockOrderByItemVec order_by_items;
ASTPtr col0_ast = col(col_name[0]);
Expand All @@ -183,57 +176,41 @@ try
ASTPtr col3_ast = col(col_name[3]);
ASTPtr func_ast;

/// "and" function
{
/// "and" function
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{{}, {}, 32, 27, 36, 34}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"korea", "china", "usa", "usa", "china", "china"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 0, 900, -300})}};

{
/// select * from clerk order by age and salary ASC limit 100;
order_by_items = {MockOrderByItem("and(age, salary)", false)};
func_ast = And(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
executeAndAssertColumnsEqual(request, expect_cols.back());
}
/// select * from clerk order by age and salary ASC limit 100;
order_by_items = {MockOrderByItem("and(age, salary)", false)};
func_ast = And(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};
request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[0]);
SortInfos sort_infos{{4, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}

{
/// "equal" function
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{27, 36, 34, 32, {}, {}}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "female", "male", "male", "female", "male"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "korea", "china"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{0, 900, -300, {}, 1300, {}})}};

{
/// select age, salary from clerk order by age = salary DESC limit 100;
order_by_items = {MockOrderByItem("equals(age, salary)", true)};
func_ast = eq(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
executeAndAssertColumnsEqual(request, expect_cols.back());
}
/// "equal" function
{
/// select age, salary from clerk order by age = salary DESC limit 100;
order_by_items = {MockOrderByItem("equals(age, salary)", true)};
func_ast = eq(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[1]);
SortInfos sort_infos{{4, true}};
executeAndAssertSortedBlocks(request, sort_infos);
}

{
/// "greater" function
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{{}, 32, {}, 36, 27, 34}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"korea", "usa", "china", "china", "usa", "china"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 900, 0, -300})}};

{
/// select age, gender, country, salary from clerk order by age > salary ASC limit 100;
order_by_items = {MockOrderByItem("greater(age, salary)", false)};
func_ast = gt(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
executeAndAssertColumnsEqual(request, expect_cols.back());
request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[2]);
SortInfos sort_infos{{4, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}

Expand All @@ -251,15 +228,30 @@ try
for (auto limit_num : limits)
{
auto request = context
.scan("test_db", table)
.scan(db_name, table)
.topN("key", false, limit_num)
.build(context);
auto expect = executeStreams(request, 1);
executeAndAssertColumnsEqual(request, expect);
SortInfos sort_infos{{0, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}
}
CATCH

TEST_F(TopNExecutorTestRunner, Empty)
try
{
for (size_t i = 0; i < col_name.size(); ++i)
{
auto request = context
.scan(db_name, empty_name)
.topN(col_name[i], false, 100)
.build(context);
SortInfos sort_infos{{i, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}
CATCH

} // namespace tests
} // namespace DB
Loading

0 comments on commit 64ae0c3

Please sign in to comment.