Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: support spill for fine grained aggregation #7220

Merged
merged 36 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
41b44d3
pre modify
SeaRise Mar 27, 2023
96c4b01
update
SeaRise Mar 27, 2023
4d39f1d
fix ut
SeaRise Mar 27, 2023
7f73b05
update
SeaRise Mar 27, 2023
bc17c52
fix comment
SeaRise Mar 28, 2023
0398d35
add more comment
SeaRise Mar 30, 2023
67a7e46
use executeIO
SeaRise Mar 30, 2023
ea6b4b3
fix tidy
SeaRise Mar 31, 2023
7ae7753
Merge branch 'master' into support_io_cpu_thread_pool
SeaRise Apr 2, 2023
bdc0e6f
Update dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
SeaRise Apr 3, 2023
00ec7a0
pre refine
SeaRise Apr 4, 2023
c29f0a6
refine
SeaRise Apr 4, 2023
07f7f5d
u
SeaRise Apr 4, 2023
62972fa
update
SeaRise Apr 4, 2023
8105719
add more comments
SeaRise Apr 4, 2023
4b1aeb0
add ut and fix
SeaRise Apr 4, 2023
b676835
u
SeaRise Apr 4, 2023
4985799
Merge branch 'master' into pipeline_support_spill_agg
SeaRise Apr 5, 2023
787467e
u
SeaRise Apr 5, 2023
90f4842
u
SeaRise Apr 5, 2023
9b6ea2e
Merge branch 'master' into pipeline_support_spill_agg
SeaRise Apr 6, 2023
c3aff6d
Merge branch 'master' into pipeline_support_spill_agg
SeaRise Apr 6, 2023
518aee8
merge master
SeaRise Apr 6, 2023
49aab93
add comments
SeaRise Apr 6, 2023
2b9eddf
Merge branch 'master' into pipeline_support_spill_agg
SeaRise Apr 7, 2023
2c111d5
fix typo
SeaRise Apr 7, 2023
f032df2
Merge branch 'master' into pipeline_support_spill_agg
SeaRise Apr 10, 2023
fc13949
minor refine
SeaRise Apr 10, 2023
849892a
Merge branch 'master' into pipeline_support_spill_agg
SeaRise Apr 14, 2023
4f8da9c
refine comments
SeaRise Apr 18, 2023
3753242
address comments
SeaRise Apr 18, 2023
36be5a0
add more comments
SeaRise Apr 19, 2023
9f042ef
check not empty
SeaRise Apr 19, 2023
68bacca
port from #7318
SeaRise Apr 20, 2023
3d49d23
Merge branch 'master' into pipeline_support_spill_agg
Lloyd-Pottiger Apr 20, 2023
7b0b566
fix clang tidy
SeaRise Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled())
{
/// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
if (!data_variants->empty())
if (data_variants->tryMarkNeedSpill())
aggregator.spill(*data_variants);
}
aggregator.finishSpill();
Expand Down
23 changes: 12 additions & 11 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ Block ParallelAggregatingBlockInputStream::readImpl()

void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
auto & data = *parent.many_data[thread_num];
parent.aggregator.executeOnBlock(
block,
*parent.many_data[thread_num],
data,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns);
if (data.need_spill)
parent.aggregator.spill(data);

parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
Expand All @@ -150,11 +153,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_
{
/// Flush data in the RAM to disk. So it's easier to unite them later.
auto & data = *parent.many_data[thread_num];

if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel();

if (!data.empty())
if (data.tryMarkNeedSpill())
parent.aggregator.spill(data);
}
}
Expand All @@ -167,10 +166,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
/// because at the time of `onFinishThread` call, no data has been flushed to disk, and then some were.
for (auto & data : parent.many_data)
{
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel();

if (!data->empty())
if (data->tryMarkNeedSpill())
parent.aggregator.spill(*data);
}
}
Expand Down Expand Up @@ -245,11 +241,16 @@ void ParallelAggregatingBlockInputStream::execute()
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
{
auto & data = *many_data[0];
aggregator.executeOnBlock(
children.at(0)->getHeader(),
*many_data[0],
data,
threads_data[0].key_columns,
threads_data[0].aggregate_columns);
if (data.need_spill)
aggregator.spill(data);
}
}

void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const
Expand Down
32 changes: 9 additions & 23 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,15 @@ void PhysicalAggregationConvergent::buildPipelineExecGroup(
assert(!fine_grained_shuffle.enable());

aggregate_context->initConvergent();

if (unlikely(aggregate_context->useNullSource()))
{
group_builder.init(1);
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<NullSourceOp>(
exec_status,
aggregate_context->getHeader(),
log->identifier()));
});
}
Comment on lines -33 to -42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to delete these codes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of AggContext::useNullSource is wrapped into AggContext::getConvergentConcurrency and AggContext::readForConvergent, just to simplify the code.

else
{
group_builder.init(aggregate_context->getConvergentConcurrency());
size_t index = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<AggregateConvergentSourceOp>(
exec_status,
aggregate_context,
index++,
log->identifier()));
});
}
group_builder.init(aggregate_context->getConvergentConcurrency());
size_t index = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<AggregateConvergentSourceOp>(
exec_status,
aggregate_context,
index++,
log->identifier()));
});

executeExpression(exec_status, group_builder, expr_after_agg, log);
}
Expand Down
83 changes: 64 additions & 19 deletions dbms/src/Flash/tests/gtest_spill_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,34 @@ try
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// disable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
auto ref_columns = executeStreams(request, original_max_streams, true);
auto ref_columns = executeStreams(request, original_max_streams);
/// enable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(total_data_size / 200)));
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// test single thread aggregation
/// need to enable memory tracker since currently, the memory usage in aggregator is
/// calculated by memory tracker, if memory tracker is not enabled, spill will never be triggered.
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// enable spill and use small max_cached_data_bytes_in_spiller
context.context->setSetting("max_cached_data_bytes_in_spiller", Field(static_cast<UInt64>(total_data_size / 200)));
/// test single thread aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// test spill with small max_block_size
/// the avg rows in one bucket is ~10240/256 = 400, so set the small_max_block_size to 300
/// is enough to test the output spilt
size_t small_max_block_size = 300;
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(small_max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, 1, true);
auto blocks = getExecuteStreamsReturnBlocks(request, 1);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
}
ASSERT_COLUMNS_EQ_UR(ref_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams, true);
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
Expand Down Expand Up @@ -166,10 +164,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
/// here has to enable memory tracker otherwise the processList in the context is the last query's processList
/// and may cause segment fault, maybe a bug but should not happens in TiDB because all the tasks from tidb
/// enable memory tracker
auto reference = executeStreams(request, 1, true);
auto reference = executeStreams(request, 1);
if (current_collator->isCI())
{
/// for ci collation, need to sort and compare the result manually
Expand Down Expand Up @@ -198,7 +193,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(max_bytes_before_external_agg)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency, true);
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
block.checkNumberOfRows();
Expand Down Expand Up @@ -302,10 +297,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
/// here has to enable memory tracker otherwise the processList in the context is the last query's processList
/// and may cause segment fault, maybe a bug but should not happens in TiDB because all the tasks from tidb
/// enable memory tracker
auto reference = executeStreams(request, 1, true);
auto reference = executeStreams(request, 1);
if (current_collator->isCI())
{
/// for ci collation, need to sort and compare the result manually
Expand Down Expand Up @@ -334,7 +326,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(max_bytes_before_external_agg)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency, true);
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
block.checkNumberOfRows();
Expand All @@ -358,5 +350,58 @@ try
}
}
CATCH

TEST_F(SpillAggregationTestRunner, FineGrainedShuffle)
try
{
DB::MockColumnInfoVec column_infos{{"a", TiDB::TP::TypeLongLong}, {"b", TiDB::TP::TypeLongLong}, {"c", TiDB::TP::TypeLongLong}, {"d", TiDB::TP::TypeLongLong}, {"e", TiDB::TP::TypeLongLong}};
DB::MockColumnInfoVec partition_column_infos{{"a", TiDB::TP::TypeLongLong}, {"b", TiDB::TP::TypeLongLong}};
ColumnsWithTypeAndName column_datas;
size_t table_rows = 5120;
size_t duplicated_rows = 2560;
UInt64 max_block_size = 100;
size_t total_data_size = 0;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(column_infos))
{
ColumnGeneratorOpts opts{table_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
column_datas.push_back(ColumnGenerator::instance().generate(opts));
total_data_size += column_datas.back().column->byteSize();
}
for (auto & column_data : column_datas)
column_data.column->assumeMutable()->insertRangeFrom(*column_data.column, 0, duplicated_rows);
context.addExchangeReceiver("exchange_receiver_1_concurrency", column_infos, column_datas, 1, partition_column_infos);
context.addExchangeReceiver("exchange_receiver_3_concurrency", column_infos, column_datas, 3, partition_column_infos);
context.addExchangeReceiver("exchange_receiver_5_concurrency", column_infos, column_datas, 5, partition_column_infos);
context.addExchangeReceiver("exchange_receiver_10_concurrency", column_infos, column_datas, 10, partition_column_infos);
std::vector<size_t> exchange_receiver_concurrency = {1, 3, 5, 10};

auto gen_request = [&](size_t exchange_concurrency) {
return context
.receive(fmt::format("exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency)
.aggregation({Min(col("c")), Max(col("d")), Count(col("e"))}, {col("a"), col("b")}, exchange_concurrency)
.build(context);
};
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));

/// disable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
auto baseline = executeStreams(gen_request(1), 1);

/// enable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(total_data_size / 200)));
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
for (size_t exchange_concurrency : exchange_receiver_concurrency)
{
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
auto request = gen_request(exchange_concurrency);
enablePipeline(false);
ASSERT_COLUMNS_EQ_UR(baseline, executeStreams(request, exchange_concurrency));
enablePipeline(true);
ASSERT_COLUMNS_EQ_UR(baseline, executeStreams(request, exchange_concurrency));
}
}
CATCH

} // namespace tests
} // namespace DB
32 changes: 27 additions & 5 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ AggregatedDataVariants::~AggregatedDataVariants()
destroyAggregationMethodImpl();
}

bool AggregatedDataVariants::tryMarkNeedSpill()
{
assert(!need_spill);
if (empty())
return false;
if (!isTwoLevel())
{
/// Data can only be flushed to disk if a two-level aggregation is supported.
if (!isConvertibleToTwoLevel())
return false;
convertToTwoLevel();
}
need_spill = true;
return true;
}

void AggregatedDataVariants::destroyAggregationMethodImpl()
{
if (!aggregation_method_impl)
Expand Down Expand Up @@ -719,6 +735,8 @@ bool Aggregator::executeOnBlock(
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns)
{
assert(!result.need_spill);

if (is_cancelled())
return true;

Expand Down Expand Up @@ -811,15 +829,11 @@ bool Aggregator::executeOnBlock(
result.convertToTwoLevel();

/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation is supported.
*/
if (max_bytes_before_external_group_by && result_size > 0
&& (result.isTwoLevel() || result.isConvertibleToTwoLevel())
&& result_size_bytes > max_bytes_before_external_group_by)
{
if (!result.isTwoLevel())
result.convertToTwoLevel();
spill(result);
result.tryMarkNeedSpill();
}

return true;
Expand Down Expand Up @@ -847,6 +861,7 @@ void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_dat

void Aggregator::spill(AggregatedDataVariants & data_variants)
{
assert(data_variants.need_spill);
bool init_value = false;
if (spill_triggered.compare_exchange_strong(init_value, true, std::memory_order_relaxed))
{
Expand All @@ -872,6 +887,7 @@ void Aggregator::spill(AggregatedDataVariants & data_variants)

/// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
data_variants.init(data_variants.type);
data_variants.need_spill = false;
data_variants.aggregates_pools = Arenas(1, std::make_shared<Arena>());
data_variants.aggregates_pool = data_variants.aggregates_pools.back().get();
data_variants.without_key = nullptr;
Expand Down Expand Up @@ -978,12 +994,18 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria

if (!executeOnBlock(block, result, key_columns, aggregate_columns))
break;
if (result.need_spill)
spill(result);
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
{
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns);
if (result.need_spill)
spill(result);
}

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,10 @@ struct AggregatedDataVariants : private boost::noncopyable

Type type{Type::EMPTY};

bool need_spill = false;

bool tryMarkNeedSpill();

void destroyAggregationMethodImpl();

AggregatedDataVariants()
Expand Down
Loading