Skip to content

Commit

Permalink
Pipeline: Support pipeline agg (#6855)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
ywqzzy authored Mar 3, 2023
1 parent 6a8507c commit b7132a0
Show file tree
Hide file tree
Showing 19 changed files with 775 additions and 171 deletions.
8 changes: 7 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,13 @@ void DAGQueryBlockInterpreter::executeAggregation(
const Settings & settings = context.getSettingsRef();

AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider());
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
context.getFileProvider());
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypeExchangeSender:
case tipb::ExecType::TypeExchangeReceiver:
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Planner/PlanType.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ struct PlanType
TableScan = 11,
MockTableScan = 12,
Join = 13,
GetResult = 14,
Expand = 15,
AggregationBuild = 14,
AggregationConvergent = 15,
Expand = 16,
GetResult = 17
};
PlanTypeEnum enum_value;

Expand Down
48 changes: 37 additions & 11 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <Flash/Planner/FinalizeHelper.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Flash/Planner/Plans/PhysicalAggregation.h>
#include <Flash/Planner/Plans/PhysicalAggregationBuild.h>
#include <Flash/Planner/Plans/PhysicalAggregationConvergent.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -94,7 +96,13 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont

Block before_agg_header = pipeline.firstStream()->getHeader();
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, context.getFileProvider());
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
context.getSettingsRef().max_cached_data_bytes_in_spiller,
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
context.getFileProvider());
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
Expand Down Expand Up @@ -155,16 +163,34 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont

void PhysicalAggregation::buildPipeline(PipelineBuilder & builder)
{
// Break the pipeline for pre-agg.
// FIXME: Should be newly created PhysicalPreAgg.
auto pre_agg_builder = builder.breakPipeline(shared_from_this());
// Pre-agg pipeline.
child->buildPipeline(pre_agg_builder);
pre_agg_builder.build();
// Final-agg pipeline.
// FIXME: Should be newly created PhysicalFinalAgg.
builder.addPlanNode(shared_from_this());
throw Exception("Unsupport");
auto aggregate_context = std::make_shared<AggregateContext>(
log->identifier());
// TODO support fine grained shuffle.
assert(!fine_grained_shuffle.enable());
auto agg_build = std::make_shared<PhysicalAggregationBuild>(
executor_id,
schema,
log->identifier(),
child,
before_agg_actions,
aggregation_keys,
aggregation_collators,
is_final_agg,
aggregate_descriptions,
aggregate_context);
// Break the pipeline for agg_build.
auto agg_build_builder = builder.breakPipeline(agg_build);
// agg_build pipeline.
child->buildPipeline(agg_build_builder);
agg_build_builder.build();
// agg_convergent pipeline.
auto agg_convergent = std::make_shared<PhysicalAggregationConvergent>(
executor_id,
schema,
log->identifier(),
aggregate_context,
expr_after_agg);
builder.addPlanNode(agg_convergent);
}

void PhysicalAggregation::finalize(const Names & parent_require)
Expand Down
60 changes: 60 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Planner/Plans/PhysicalAggregationBuild.h>
#include <Operators/AggregateSinkOp.h>
#include <Operators/ExpressionTransformOp.h>

namespace DB
{
void PhysicalAggregationBuild::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/)
{
if (!before_agg_actions->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(group_builder.exec_status, log->identifier(), before_agg_actions));
});
}

size_t build_index = 0;
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<AggregateSinkOp>(group_builder.exec_status, build_index++, aggregate_context, log->identifier()));
});

Block before_agg_header = group_builder.getCurrentHeader();
size_t concurrency = group_builder.concurrency;
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
context.getSettingsRef().max_cached_data_bytes_in_spiller,
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
context.getFileProvider());

auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
concurrency,
aggregation_keys,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
spill_config);

aggregate_context->initBuild(params, concurrency);
}
} // namespace DB
63 changes: 63 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/Plans/PhysicalUnary.h>
#include <Flash/Planner/Plans/PipelineBreakerHelper.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Operators/AggregateContext.h>

namespace DB
{
class PhysicalAggregationBuild : public PhysicalUnary
{
public:
PhysicalAggregationBuild(
const String & executor_id_,
const NamesAndTypes & schema_,
const String & req_id,
const PhysicalPlanNodePtr & child_,
const ExpressionActionsPtr & before_agg_actions_,
const Names & aggregation_keys_,
const TiDB::TiDBCollators & aggregation_collators_,
bool is_final_agg_,
const AggregateDescriptions & aggregate_descriptions_,
const AggregateContextPtr & aggregate_context_)
: PhysicalUnary(executor_id_, PlanType::AggregationBuild, schema_, req_id, child_)
, before_agg_actions(before_agg_actions_)
, aggregation_keys(aggregation_keys_)
, aggregation_collators(aggregation_collators_)
, is_final_agg(is_final_agg_)
, aggregate_descriptions(aggregate_descriptions_)
, aggregate_context(aggregate_context_)
{}

void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) override;

private:
DISABLE_USELESS_FUNCTION_FOR_BREAKER

private:
ExpressionActionsPtr before_agg_actions;
Names aggregation_keys;
TiDB::TiDBCollators aggregation_collators;
bool is_final_agg;
AggregateDescriptions aggregate_descriptions;
AggregateContextPtr aggregate_context;
};
} // namespace DB
56 changes: 56 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Flash/Planner/Plans/PhysicalAggregationConvergent.h>
#include <Operators/AggregateConvergentSourceOp.h>
#include <Operators/ExpressionTransformOp.h>
#include <Operators/NullSourceOp.h>

namespace DB
{

void PhysicalAggregationConvergent::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/)
{
aggregate_context->initConvergent();

if (unlikely(aggregate_context->useNullSource()))
{
group_builder.init(1);
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<NullSourceOp>(
group_builder.exec_status,
aggregate_context->getHeader(),
log->identifier()));
});
}
else
{
group_builder.init(aggregate_context->getConvergentConcurrency());
size_t index = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<AggregateConvergentSourceOp>(
group_builder.exec_status,
aggregate_context,
index++,
log->identifier()));
});
}

if (!expr_after_agg->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(group_builder.exec_status, log->identifier(), expr_after_agg));
});
}
}
} // namespace DB
48 changes: 48 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/Plans/PhysicalLeaf.h>
#include <Flash/Planner/Plans/PipelineBreakerHelper.h>
#include <Interpreters/ExpressionActions.h>
#include <Operators/AggregateContext.h>

namespace DB
{
class PhysicalAggregationConvergent : public PhysicalLeaf
{
public:
PhysicalAggregationConvergent(
const String & executor_id_,
const NamesAndTypes & schema_,
const String & req_id,
const AggregateContextPtr & aggregate_context_,
const ExpressionActionsPtr & expr_after_agg_)
: PhysicalLeaf(executor_id_, PlanType::AggregationConvergent, schema_, req_id)
, expr_after_agg(expr_after_agg_)
, aggregate_context(aggregate_context_)
{}

void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t concurrency) override;

private:
DISABLE_USELESS_FUNCTION_FOR_BREAKER

private:
ExpressionActionsPtr expr_after_agg;
AggregateContextPtr aggregate_context;
};
} // namespace DB
36 changes: 36 additions & 0 deletions dbms/src/Flash/Planner/Plans/PipelineBreakerHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace DB
{
#define DISABLE_USELESS_FUNCTION_FOR_BREAKER \
void buildPipeline(PipelineBuilder &) override \
{ \
throw Exception("Unsupport"); \
} \
void finalize(const Names &) override \
{ \
throw Exception("Unsupport"); \
} \
const Block & getSampleBlock() const override \
{ \
throw Exception("Unsupport"); \
} \
void buildBlockInputStreamImpl(DAGPipeline &, Context &, size_t) override \
{ \
throw Exception("Unsupport"); \
}
} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@ try
.aggregation({Max(col("s1"))}, {col("s2")})
.build(context);
executeAndAssertColumnsEqual(request, {});

request = context.scan("test_db", "empty_table")
.aggregation({Count(lit(Field(static_cast<UInt64>(1))))}, {})
.build(context);
executeAndAssertColumnsEqual(request, {toVec<UInt64>({0})});
}
CATCH

Expand Down
Loading

0 comments on commit b7132a0

Please sign in to comment.