From b7132a0baac746b31e9ffb85a0e89f31bcdbdc5d Mon Sep 17 00:00:00 2001 From: yanweiqi <592838129@qq.com> Date: Fri, 3 Mar 2023 15:59:09 +0800 Subject: [PATCH] Pipeline: Support pipeline agg (#6855) ref pingcap/tiflash#6518 --- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 8 +- dbms/src/Flash/Pipeline/Pipeline.cpp | 1 + dbms/src/Flash/Planner/PlanType.h | 6 +- .../Planner/Plans/PhysicalAggregation.cpp | 48 ++++- .../Plans/PhysicalAggregationBuild.cpp | 60 ++++++ .../Planner/Plans/PhysicalAggregationBuild.h | 63 ++++++ .../Plans/PhysicalAggregationConvergent.cpp | 56 +++++ .../Plans/PhysicalAggregationConvergent.h | 48 +++++ .../Planner/Plans/PipelineBreakerHelper.h | 36 ++++ .../tests/gtest_aggregation_executor.cpp | 5 + .../tests/gtest_pipeline_interpreter.out | 192 ++++-------------- .../Interpreters/InterpreterSelectQuery.cpp | 8 +- dbms/src/Operators/AggregateContext.cpp | 117 +++++++++++ dbms/src/Operators/AggregateContext.h | 83 ++++++++ .../Operators/AggregateConvergentSourceOp.cpp | 30 +++ .../Operators/AggregateConvergentSourceOp.h | 52 +++++ dbms/src/Operators/AggregateSinkOp.cpp | 37 ++++ dbms/src/Operators/AggregateSinkOp.h | 51 +++++ dbms/src/Operators/NullSourceOp.h | 45 ++++ 19 files changed, 775 insertions(+), 171 deletions(-) create mode 100644 dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp create mode 100644 dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h create mode 100644 dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp create mode 100644 dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h create mode 100644 dbms/src/Flash/Planner/Plans/PipelineBreakerHelper.h create mode 100644 dbms/src/Operators/AggregateContext.cpp create mode 100644 dbms/src/Operators/AggregateContext.h create mode 100644 dbms/src/Operators/AggregateConvergentSourceOp.cpp create mode 100644 dbms/src/Operators/AggregateConvergentSourceOp.h create mode 100644 dbms/src/Operators/AggregateSinkOp.cpp create mode 100644 dbms/src/Operators/AggregateSinkOp.h create mode 100644 dbms/src/Operators/NullSourceOp.h diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index f3c73634740..2169bfec027 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -402,7 +402,13 @@ void DAGQueryBlockInterpreter::executeAggregation( const Settings & settings = context.getSettingsRef(); AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); - SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig spill_config( + context.getTemporaryPath(), + fmt::format("{}_aggregation", log->identifier()), + settings.max_cached_data_bytes_in_spiller, + settings.max_spilled_rows_per_file, + settings.max_spilled_bytes_per_file, + context.getFileProvider()); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index fa789746c8b..0cf53a27968 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -140,6 +140,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request) case tipb::ExecType::TypeSelection: case tipb::ExecType::TypeLimit: case tipb::ExecType::TypeTopN: + case tipb::ExecType::TypeAggregation: case tipb::ExecType::TypeTableScan: case tipb::ExecType::TypeExchangeSender: case tipb::ExecType::TypeExchangeReceiver: diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index c3c31ce3c81..36017561ff4 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -36,8 +36,10 @@ struct PlanType TableScan = 11, MockTableScan = 12, Join = 13, - GetResult = 14, - Expand = 15, + AggregationBuild = 14, + AggregationConvergent = 15, + Expand = 16, + GetResult = 17 }; PlanTypeEnum enum_value; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index 9aacf49e967..0aa517c76a3 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include namespace DB @@ -94,7 +96,13 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont Block before_agg_header = pipeline.firstStream()->getHeader(); AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); - SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig spill_config( + context.getTemporaryPath(), + fmt::format("{}_aggregation", log->identifier()), + context.getSettingsRef().max_cached_data_bytes_in_spiller, + context.getSettingsRef().max_spilled_rows_per_file, + context.getSettingsRef().max_spilled_bytes_per_file, + context.getFileProvider()); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, @@ -155,16 +163,34 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont void PhysicalAggregation::buildPipeline(PipelineBuilder & builder) { - // Break the pipeline for pre-agg. - // FIXME: Should be newly created PhysicalPreAgg. - auto pre_agg_builder = builder.breakPipeline(shared_from_this()); - // Pre-agg pipeline. - child->buildPipeline(pre_agg_builder); - pre_agg_builder.build(); - // Final-agg pipeline. - // FIXME: Should be newly created PhysicalFinalAgg. - builder.addPlanNode(shared_from_this()); - throw Exception("Unsupport"); + auto aggregate_context = std::make_shared( + log->identifier()); + // TODO support fine grained shuffle. + assert(!fine_grained_shuffle.enable()); + auto agg_build = std::make_shared( + executor_id, + schema, + log->identifier(), + child, + before_agg_actions, + aggregation_keys, + aggregation_collators, + is_final_agg, + aggregate_descriptions, + aggregate_context); + // Break the pipeline for agg_build. + auto agg_build_builder = builder.breakPipeline(agg_build); + // agg_build pipeline. + child->buildPipeline(agg_build_builder); + agg_build_builder.build(); + // agg_convergent pipeline. + auto agg_convergent = std::make_shared( + executor_id, + schema, + log->identifier(), + aggregate_context, + expr_after_agg); + builder.addPlanNode(agg_convergent); } void PhysicalAggregation::finalize(const Names & parent_require) diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp new file mode 100644 index 00000000000..a2a09f761c4 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp @@ -0,0 +1,60 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +void PhysicalAggregationBuild::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) +{ + if (!before_agg_actions->getActions().empty()) + { + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), before_agg_actions)); + }); + } + + size_t build_index = 0; + group_builder.transform([&](auto & builder) { + builder.setSinkOp(std::make_unique(group_builder.exec_status, build_index++, aggregate_context, log->identifier())); + }); + + Block before_agg_header = group_builder.getCurrentHeader(); + size_t concurrency = group_builder.concurrency; + AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); + SpillConfig spill_config( + context.getTemporaryPath(), + fmt::format("{}_aggregation", log->identifier()), + context.getSettingsRef().max_cached_data_bytes_in_spiller, + context.getSettingsRef().max_spilled_rows_per_file, + context.getSettingsRef().max_spilled_bytes_per_file, + context.getFileProvider()); + + auto params = AggregationInterpreterHelper::buildParams( + context, + before_agg_header, + concurrency, + concurrency, + aggregation_keys, + aggregation_collators, + aggregate_descriptions, + is_final_agg, + spill_config); + + aggregate_context->initBuild(params, concurrency); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h new file mode 100644 index 00000000000..e512669bf8e --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h @@ -0,0 +1,63 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class PhysicalAggregationBuild : public PhysicalUnary +{ +public: + PhysicalAggregationBuild( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const ExpressionActionsPtr & before_agg_actions_, + const Names & aggregation_keys_, + const TiDB::TiDBCollators & aggregation_collators_, + bool is_final_agg_, + const AggregateDescriptions & aggregate_descriptions_, + const AggregateContextPtr & aggregate_context_) + : PhysicalUnary(executor_id_, PlanType::AggregationBuild, schema_, req_id, child_) + , before_agg_actions(before_agg_actions_) + , aggregation_keys(aggregation_keys_) + , aggregation_collators(aggregation_collators_) + , is_final_agg(is_final_agg_) + , aggregate_descriptions(aggregate_descriptions_) + , aggregate_context(aggregate_context_) + {} + + void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) override; + +private: + DISABLE_USELESS_FUNCTION_FOR_BREAKER + +private: + ExpressionActionsPtr before_agg_actions; + Names aggregation_keys; + TiDB::TiDBCollators aggregation_collators; + bool is_final_agg; + AggregateDescriptions aggregate_descriptions; + AggregateContextPtr aggregate_context; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp new file mode 100644 index 00000000000..92221e90011 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp @@ -0,0 +1,56 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include + +namespace DB +{ + +void PhysicalAggregationConvergent::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +{ + aggregate_context->initConvergent(); + + if (unlikely(aggregate_context->useNullSource())) + { + group_builder.init(1); + group_builder.transform([&](auto & builder) { + builder.setSourceOp(std::make_unique( + group_builder.exec_status, + aggregate_context->getHeader(), + log->identifier())); + }); + } + else + { + group_builder.init(aggregate_context->getConvergentConcurrency()); + size_t index = 0; + group_builder.transform([&](auto & builder) { + builder.setSourceOp(std::make_unique( + group_builder.exec_status, + aggregate_context, + index++, + log->identifier())); + }); + } + + if (!expr_after_agg->getActions().empty()) + { + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), expr_after_agg)); + }); + } +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h new file mode 100644 index 00000000000..7895986b6cb --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h @@ -0,0 +1,48 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +class PhysicalAggregationConvergent : public PhysicalLeaf +{ +public: + PhysicalAggregationConvergent( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const AggregateContextPtr & aggregate_context_, + const ExpressionActionsPtr & expr_after_agg_) + : PhysicalLeaf(executor_id_, PlanType::AggregationConvergent, schema_, req_id) + , expr_after_agg(expr_after_agg_) + , aggregate_context(aggregate_context_) + {} + + void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t concurrency) override; + +private: + DISABLE_USELESS_FUNCTION_FOR_BREAKER + +private: + ExpressionActionsPtr expr_after_agg; + AggregateContextPtr aggregate_context; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PipelineBreakerHelper.h b/dbms/src/Flash/Planner/Plans/PipelineBreakerHelper.h new file mode 100644 index 00000000000..1d01a0b1bdf --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PipelineBreakerHelper.h @@ -0,0 +1,36 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace DB +{ +#define DISABLE_USELESS_FUNCTION_FOR_BREAKER \ + void buildPipeline(PipelineBuilder &) override \ + { \ + throw Exception("Unsupport"); \ + } \ + void finalize(const Names &) override \ + { \ + throw Exception("Unsupport"); \ + } \ + const Block & getSampleBlock() const override \ + { \ + throw Exception("Unsupport"); \ + } \ + void buildBlockInputStreamImpl(DAGPipeline &, Context &, size_t) override \ + { \ + throw Exception("Unsupport"); \ + } +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index 10fffbaeee8..4f4d5c8027b 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -622,6 +622,11 @@ try .aggregation({Max(col("s1"))}, {col("s2")}) .build(context); executeAndAssertColumnsEqual(request, {}); + + request = context.scan("test_db", "empty_table") + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeAndAssertColumnsEqual(request, {toVec({0})}); } CATCH diff --git a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out index a59672f7809..6db7632be0f 100644 --- a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out +++ b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out @@ -26,37 +26,14 @@ pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> TopN|topn_2 -> TopN|top ~test_suite_name: SingleQueryBlock ~result_index: 0 ~result: -Union: - Expression x 10: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Filter - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Filter - MockTableScan +pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> TopN|topn_4 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|aggregation_2 @ ~test_suite_name: SingleQueryBlock ~result_index: 1 ~result: -Union: - Expression x 10: - SharedQuery: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Filter - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Filter - MockTableScan +pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> Limit|limit_4 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|aggregation_2 @ ~test_suite_name: ParallelQuery ~result_index: 0 @@ -81,20 +58,14 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|Non ~test_suite_name: ParallelQuery ~result_index: 4 ~result: -Expression: - Expression: - Aggregating - MockTableScan +pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: ParallelQuery ~result_index: 5 ~result: -Union: - Expression x 5: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 5, final: true - MockTableScan x 5 +pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: ParallelQuery ~result_index: 6 @@ -119,94 +90,52 @@ pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Projection|NonTi ~test_suite_name: ParallelQuery ~result_index: 10 ~result: -Union: - Expression x 10: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - SharedQuery x 10: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - MockTableScan +pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|project_2 -> AggregationBuild|aggregation_3 @ ~test_suite_name: ParallelQuery ~result_index: 11 ~result: -Expression: - Expression: - Aggregating - Limit, limit = 10 - MockTableScan +pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|project_2 -> AggregationBuild|aggregation_3 @ ~test_suite_name: ParallelQuery ~result_index: 12 ~result: -Union: - Expression x 10: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - SharedQuery x 10: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockTableScan +pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|project_2 -> AggregationBuild|aggregation_3 @ ~test_suite_name: ParallelQuery ~result_index: 13 ~result: -Expression: - Expression: - Aggregating - MergeSorting, limit = 10 - PartialSorting: limit = 10 - MockTableScan +pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|project_2 -> AggregationBuild|aggregation_3 @ ~test_suite_name: ParallelQuery ~result_index: 14 ~result: -Union: - Expression x 10: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - MockTableScan x 10 +pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator + |- pipeline#1: AggregationConvergent|aggregation_1 -> Projection|project_2 -> AggregationBuild|aggregation_3 + |- pipeline#2: MockTableScan|table_scan_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: ParallelQuery ~result_index: 15 ~result: -Expression: - Expression: - Aggregating - Expression: - Expression: - Aggregating - MockTableScan +pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator + |- pipeline#1: AggregationConvergent|aggregation_1 -> Projection|project_2 -> AggregationBuild|aggregation_3 + |- pipeline#2: MockTableScan|table_scan_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: ParallelQuery ~result_index: 16 ~result: -Union: - MockExchangeSender x 10 - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - MockTableScan x 10 +pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 + |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: ParallelQuery ~result_index: 17 ~result: -MockExchangeSender - Expression: - Expression: - Aggregating - MockTableScan +pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 + |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: ParallelQuery ~result_index: 18 @@ -261,40 +190,14 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 2 ~result: -Union: - Expression x 10: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - MockTableScan +pipeline#0: AggregationConvergent|aggregation_4 -> Projection|project_5 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> AggregationBuild|aggregation_4 @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 3 ~result: -Union: - Expression x 10: - SharedQuery: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Expression: - Filter - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - MockTableScan +pipeline#0: AggregationConvergent|aggregation_4 -> Projection|project_5 -> Filter|selection_6 -> Projection|project_7 -> Limit|limit_8 -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> AggregationBuild|aggregation_4 @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 4 @@ -426,13 +329,8 @@ Union: ~test_suite_name: FineGrainedShuffleAgg ~result_index: 1 ~result: -Union: - Expression x 10: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - MockExchangeReceiver +pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator + |- pipeline#1: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|aggregation_1 @ ~test_suite_name: Join ~result_index: 0 @@ -577,32 +475,14 @@ CreatingSets ~test_suite_name: ListBase ~result_index: 0 ~result: -Expression: - Limit, limit = 10 - Filter - Expression: - Aggregating - Expression: - Filter - MockTableScan +pipeline#0: AggregationConvergent|3_aggregation -> Filter|4_selection -> Limit|5_limit -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|1_table_scan -> Filter|2_selection -> AggregationBuild|3_aggregation @ ~test_suite_name: ListBase ~result_index: 1 ~result: -Union: - Expression x 20: - SharedQuery: - MergeSorting, limit = 10 - Union: - PartialSorting x 20: limit = 10 - Expression: - Filter - Expression: - SharedQuery: - ParallelAggregating, max_threads: 20, final: true - Expression x 20: - Filter - MockTableScan +pipeline#0: AggregationConvergent|3_aggregation -> Filter|4_selection -> TopN|5_top_n -> Projection|NonTiDBOperator + |- pipeline#1: MockTableScan|1_table_scan -> Filter|2_selection -> AggregationBuild|3_aggregation @ ~test_suite_name: ExpandPlan ~result_index: 0 diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 3260a6453ea..6d84f43fdfd 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -918,7 +918,13 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre */ bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; - SpillConfig spill_config(context.getTemporaryPath(), "aggregation", settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig spill_config( + context.getTemporaryPath(), + "aggregation", + settings.max_cached_data_bytes_in_spiller, + settings.max_spilled_rows_per_file, + settings.max_spilled_bytes_per_file, + context.getFileProvider()); Aggregator::Params params(header, keys, aggregates, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, false, spill_config, settings.max_block_size); /// If there are several sources, then we perform parallel aggregation diff --git a/dbms/src/Operators/AggregateContext.cpp b/dbms/src/Operators/AggregateContext.cpp new file mode 100644 index 00000000000..e56a1d5d8b3 --- /dev/null +++ b/dbms/src/Operators/AggregateContext.cpp @@ -0,0 +1,117 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ +void AggregateContext::initBuild(const Aggregator::Params & params, size_t max_threads_) +{ + RUNTIME_CHECK(!inited_build && !inited_convergent); + max_threads = max_threads_; + empty_result_for_aggregation_by_empty_set = params.empty_result_for_aggregation_by_empty_set; + keys_size = params.keys_size; + many_data.reserve(max_threads); + threads_data.reserve(max_threads); + for (size_t i = 0; i < max_threads; ++i) + { + threads_data.emplace_back(params.keys_size, params.aggregates_size); + many_data.emplace_back(std::make_shared()); + } + + aggregator = std::make_unique(params, log->identifier()); + aggregator->initThresholdByAggregatedDataVariantsSize(many_data.size()); + inited_build = true; + LOG_TRACE(log, "Aggregate Context inited"); +} + +void AggregateContext::executeOnBlock(size_t task_index, const Block & block) +{ + RUNTIME_CHECK(inited_build && !inited_convergent); + aggregator->executeOnBlock(block, *many_data[task_index], threads_data[task_index].key_columns, threads_data[task_index].aggregate_columns); + threads_data[task_index].src_bytes += block.bytes(); + threads_data[task_index].src_rows += block.rows(); +} + +void AggregateContext::writeSuffix() +{ + size_t total_src_rows = 0; + size_t total_src_bytes = 0; + for (size_t i = 0; i < max_threads; ++i) + { + size_t rows = many_data[i]->size(); + LOG_TRACE( + log, + "Aggregated. {} to {} rows (from {:.3f} MiB))", + threads_data[i].src_rows, + rows, + (threads_data[i].src_bytes / 1048576.0)); + total_src_rows += threads_data[i].src_rows; + total_src_bytes += threads_data[i].src_bytes; + } + + LOG_TRACE( + log, + "Total aggregated {} rows (from {:.3f} MiB))", + total_src_rows, + (total_src_bytes / 1048576.0)); + + if (total_src_rows == 0 && keys_size == 0 && !empty_result_for_aggregation_by_empty_set) + aggregator->executeOnBlock( + this->getHeader(), + *many_data[0], + threads_data[0].key_columns, + threads_data[0].aggregate_columns); +} + +void AggregateContext::initConvergent() +{ + RUNTIME_CHECK(inited_build && !inited_convergent); + + merging_buckets = aggregator->mergeAndConvertToBlocks(many_data, true, max_threads); + inited_convergent = true; + RUNTIME_CHECK(!merging_buckets || merging_buckets->getConcurrency() > 0); +} + +size_t AggregateContext::getConvergentConcurrency() +{ + RUNTIME_CHECK(inited_convergent); + + return isTwoLevel() ? merging_buckets->getConcurrency() : 1; +} + +Block AggregateContext::getHeader() const +{ + RUNTIME_CHECK(inited_build); + return aggregator->getHeader(true); +} + +bool AggregateContext::isTwoLevel() +{ + RUNTIME_CHECK(inited_build); + return many_data[0]->isTwoLevel(); +} + +bool AggregateContext::useNullSource() +{ + RUNTIME_CHECK(inited_convergent); + return !merging_buckets; +} + +Block AggregateContext::readForConvergent(size_t index) +{ + RUNTIME_CHECK(inited_convergent); + return merging_buckets->getData(index); +} +} // namespace DB diff --git a/dbms/src/Operators/AggregateContext.h b/dbms/src/Operators/AggregateContext.h new file mode 100644 index 00000000000..32a83034f50 --- /dev/null +++ b/dbms/src/Operators/AggregateContext.h @@ -0,0 +1,83 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +struct ThreadData +{ + size_t src_rows = 0; + size_t src_bytes = 0; + + ColumnRawPtrs key_columns; + Aggregator::AggregateColumns aggregate_columns; + + ThreadData(size_t keys_size, size_t aggregates_size) + { + key_columns.resize(keys_size); + aggregate_columns.resize(aggregates_size); + } +}; + +class AggregateContext +{ +public: + explicit AggregateContext( + const String & req_id) + : log(Logger::get(req_id)) + { + } + + void initBuild(const Aggregator::Params & params, size_t max_threads_); + + void executeOnBlock(size_t task_index, const Block & block); + + void initConvergent(); + + void writeSuffix(); + + size_t getConvergentConcurrency(); + + Block readForConvergent(size_t index); + + Block getHeader() const; + + bool useNullSource(); + +private: + bool isTwoLevel(); + +private: + std::unique_ptr aggregator; + bool keys_size = false; + bool empty_result_for_aggregation_by_empty_set = false; + + std::atomic_bool inited_build = false; + std::atomic_bool inited_convergent = false; + + MergingBucketsPtr merging_buckets; + ManyAggregatedDataVariants many_data; + std::vector threads_data; + size_t max_threads{}; + + const LoggerPtr log; +}; + +using AggregateContextPtr = std::shared_ptr; +} // namespace DB diff --git a/dbms/src/Operators/AggregateConvergentSourceOp.cpp b/dbms/src/Operators/AggregateConvergentSourceOp.cpp new file mode 100644 index 00000000000..082b820067f --- /dev/null +++ b/dbms/src/Operators/AggregateConvergentSourceOp.cpp @@ -0,0 +1,30 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +namespace DB +{ +OperatorStatus AggregateConvergentSourceOp::readImpl(Block & block) +{ + block = agg_context->readForConvergent(index); + total_rows += block.rows(); + return OperatorStatus::HAS_OUTPUT; +} + +void AggregateConvergentSourceOp::operateSuffix() +{ + LOG_INFO(log, "finish read {} rows from aggregate context", total_rows); +} + +} // namespace DB diff --git a/dbms/src/Operators/AggregateConvergentSourceOp.h b/dbms/src/Operators/AggregateConvergentSourceOp.h new file mode 100644 index 00000000000..861bdb8ec5c --- /dev/null +++ b/dbms/src/Operators/AggregateConvergentSourceOp.h @@ -0,0 +1,52 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class AggregateConvergentSourceOp : public SourceOp +{ +public: + AggregateConvergentSourceOp( + PipelineExecutorStatus & exec_status_, + const AggregateContextPtr & agg_context_, + size_t index_, + const String & req_id) + : SourceOp(exec_status_, req_id) + , agg_context(agg_context_) + , index(index_) + { + setHeader(agg_context->getHeader()); + } + + String getName() const override + { + return "AggregateConvergentSourceOp"; + } + + void operateSuffix() override; + +protected: + OperatorStatus readImpl(Block & block) override; + +private: + AggregateContextPtr agg_context; + uint64_t total_rows{}; + const size_t index; +}; +} // namespace DB diff --git a/dbms/src/Operators/AggregateSinkOp.cpp b/dbms/src/Operators/AggregateSinkOp.cpp new file mode 100644 index 00000000000..6385a50f132 --- /dev/null +++ b/dbms/src/Operators/AggregateSinkOp.cpp @@ -0,0 +1,37 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +namespace DB +{ +OperatorStatus AggregateSinkOp::writeImpl(Block && block) +{ + if (unlikely(!block)) + { + return OperatorStatus::FINISHED; + } + agg_context->executeOnBlock(index, block); + total_rows += block.rows(); + block.clear(); + return OperatorStatus::NEED_INPUT; +} + +void AggregateSinkOp::operateSuffix() +{ + LOG_DEBUG(log, "finish write with {} rows", total_rows); + + agg_context->writeSuffix(); +} + +} // namespace DB diff --git a/dbms/src/Operators/AggregateSinkOp.h b/dbms/src/Operators/AggregateSinkOp.h new file mode 100644 index 00000000000..57791ea5173 --- /dev/null +++ b/dbms/src/Operators/AggregateSinkOp.h @@ -0,0 +1,51 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class AggregateSinkOp : public SinkOp +{ +public: + AggregateSinkOp( + PipelineExecutorStatus & exec_status_, + size_t index_, + AggregateContextPtr agg_context_, + const String & req_id) + : SinkOp(exec_status_, req_id) + , index(index_) + , agg_context(agg_context_) + { + } + + String getName() const override + { + return "AggregateSinkOp"; + } + + void operateSuffix() override; + +protected: + OperatorStatus writeImpl(Block && block) override; + +private: + size_t index{}; + uint64_t total_rows{}; + AggregateContextPtr agg_context; +}; +} // namespace DB diff --git a/dbms/src/Operators/NullSourceOp.h b/dbms/src/Operators/NullSourceOp.h new file mode 100644 index 00000000000..658fdec310d --- /dev/null +++ b/dbms/src/Operators/NullSourceOp.h @@ -0,0 +1,45 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class NullSourceOp : public SourceOp +{ +public: + NullSourceOp( + PipelineExecutorStatus & exec_status_, + const Block & header_, + const String & req_id) + : SourceOp(exec_status_, req_id) + { + setHeader(header_); + } + + String getName() const override + { + return "NullSourceOp"; + } + +protected: + OperatorStatus readImpl(Block & block) override + { + block = {}; + return OperatorStatus::HAS_OUTPUT; + } +}; +} // namespace DB