Skip to content

Commit

Permalink
Pipeline: support spill for fine grained aggregation (#7220)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Apr 20, 2023
1 parent 4ee1412 commit 3ef3154
Show file tree
Hide file tree
Showing 16 changed files with 603 additions and 127 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled())
{
/// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
if (!data_variants->empty())
if (data_variants->tryMarkNeedSpill())
aggregator.spill(*data_variants);
}
aggregator.finishSpill();
Expand Down
23 changes: 12 additions & 11 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ Block ParallelAggregatingBlockInputStream::readImpl()

void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
auto & data = *parent.many_data[thread_num];
parent.aggregator.executeOnBlock(
block,
*parent.many_data[thread_num],
data,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns);
if (data.need_spill)
parent.aggregator.spill(data);

parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
Expand All @@ -150,11 +153,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_
{
/// Flush data in the RAM to disk. So it's easier to unite them later.
auto & data = *parent.many_data[thread_num];

if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel();

if (!data.empty())
if (data.tryMarkNeedSpill())
parent.aggregator.spill(data);
}
}
Expand All @@ -167,10 +166,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
/// because at the time of `onFinishThread` call, no data has been flushed to disk, and then some were.
for (auto & data : parent.many_data)
{
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel();

if (!data->empty())
if (data->tryMarkNeedSpill())
parent.aggregator.spill(*data);
}
}
Expand Down Expand Up @@ -245,11 +241,16 @@ void ParallelAggregatingBlockInputStream::execute()
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
{
auto & data = *many_data[0];
aggregator.executeOnBlock(
children.at(0)->getHeader(),
*many_data[0],
data,
threads_data[0].key_columns,
threads_data[0].aggregate_columns);
if (data.need_spill)
aggregator.spill(data);
}
}

void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const
Expand Down
32 changes: 9 additions & 23 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,15 @@ void PhysicalAggregationConvergent::buildPipelineExecGroup(
assert(!fine_grained_shuffle.enable());

aggregate_context->initConvergent();

if (unlikely(aggregate_context->useNullSource()))
{
group_builder.init(1);
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<NullSourceOp>(
exec_status,
aggregate_context->getHeader(),
log->identifier()));
});
}
else
{
group_builder.init(aggregate_context->getConvergentConcurrency());
size_t index = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<AggregateConvergentSourceOp>(
exec_status,
aggregate_context,
index++,
log->identifier()));
});
}
group_builder.init(aggregate_context->getConvergentConcurrency());
size_t index = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<AggregateConvergentSourceOp>(
exec_status,
aggregate_context,
index++,
log->identifier()));
});

executeExpression(exec_status, group_builder, expr_after_agg, log);
}
Expand Down
84 changes: 65 additions & 19 deletions dbms/src/Flash/tests/gtest_spill_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,34 @@ try
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// disable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
auto ref_columns = executeStreams(request, original_max_streams, true);
auto ref_columns = executeStreams(request, original_max_streams);
/// enable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(total_data_size / 200)));
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// test single thread aggregation
/// need to enable memory tracker since currently, the memory usage in aggregator is
/// calculated by memory tracker, if memory tracker is not enabled, spill will never be triggered.
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// enable spill and use small max_cached_data_bytes_in_spiller
context.context->setSetting("max_cached_data_bytes_in_spiller", Field(static_cast<UInt64>(total_data_size / 200)));
/// test single thread aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// test spill with small max_block_size
/// the avg rows in one bucket is ~10240/256 = 400, so set the small_max_block_size to 300
/// is enough to test the output spilt
size_t small_max_block_size = 300;
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(small_max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, 1, true);
auto blocks = getExecuteStreamsReturnBlocks(request, 1);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
}
ASSERT_COLUMNS_EQ_UR(ref_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams, true);
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
Expand Down Expand Up @@ -166,10 +164,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
/// here has to enable memory tracker otherwise the processList in the context is the last query's processList
/// and may cause segment fault, maybe a bug but should not happens in TiDB because all the tasks from tidb
/// enable memory tracker
auto reference = executeStreams(request, 1, true);
auto reference = executeStreams(request, 1);
if (current_collator->isCI())
{
/// for ci collation, need to sort and compare the result manually
Expand Down Expand Up @@ -198,7 +193,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(max_bytes_before_external_agg)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency, true);
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
block.checkNumberOfRows();
Expand Down Expand Up @@ -302,10 +297,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
/// here has to enable memory tracker otherwise the processList in the context is the last query's processList
/// and may cause segment fault, maybe a bug but should not happens in TiDB because all the tasks from tidb
/// enable memory tracker
auto reference = executeStreams(request, 1, true);
auto reference = executeStreams(request, 1);
if (current_collator->isCI())
{
/// for ci collation, need to sort and compare the result manually
Expand Down Expand Up @@ -334,7 +326,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(max_bytes_before_external_agg)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency, true);
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
block.checkNumberOfRows();
Expand All @@ -358,5 +350,59 @@ try
}
}
CATCH

TEST_F(SpillAggregationTestRunner, FineGrainedShuffle)
try
{
DB::MockColumnInfoVec column_infos{{"a", TiDB::TP::TypeLongLong}, {"b", TiDB::TP::TypeLongLong}, {"c", TiDB::TP::TypeLongLong}, {"d", TiDB::TP::TypeLongLong}, {"e", TiDB::TP::TypeLongLong}};
DB::MockColumnInfoVec partition_column_infos{{"a", TiDB::TP::TypeLongLong}, {"b", TiDB::TP::TypeLongLong}};
ColumnsWithTypeAndName column_datas;
size_t table_rows = 5120;
size_t duplicated_rows = 2560;
UInt64 max_block_size = 100;
size_t total_data_size = 0;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(column_infos))
{
ColumnGeneratorOpts opts{table_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
column_datas.push_back(ColumnGenerator::instance().generate(opts));
total_data_size += column_datas.back().column->byteSize();
}
for (auto & column_data : column_datas)
column_data.column->assumeMutable()->insertRangeFrom(*column_data.column, 0, duplicated_rows);
context.addExchangeReceiver("exchange_receiver_1_concurrency", column_infos, column_datas, 1, partition_column_infos);
context.addExchangeReceiver("exchange_receiver_3_concurrency", column_infos, column_datas, 3, partition_column_infos);
context.addExchangeReceiver("exchange_receiver_5_concurrency", column_infos, column_datas, 5, partition_column_infos);
context.addExchangeReceiver("exchange_receiver_10_concurrency", column_infos, column_datas, 10, partition_column_infos);
std::vector<size_t> exchange_receiver_concurrency = {1, 3, 5, 10};

auto gen_request = [&](size_t exchange_concurrency) {
return context
.receive(fmt::format("exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency)
.aggregation({Min(col("c")), Max(col("d")), Count(col("e"))}, {col("a"), col("b")}, exchange_concurrency)
.build(context);
};
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));

/// disable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
enablePipeline(false);
auto baseline = executeStreams(gen_request(1), 1);

/// enable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(total_data_size / 200)));
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
for (size_t exchange_concurrency : exchange_receiver_concurrency)
{
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
auto request = gen_request(exchange_concurrency);
enablePipeline(false);
ASSERT_COLUMNS_EQ_UR(baseline, executeStreams(request, exchange_concurrency));
enablePipeline(true);
ASSERT_COLUMNS_EQ_UR(baseline, executeStreams(request, exchange_concurrency));
}
}
CATCH

} // namespace tests
} // namespace DB
32 changes: 27 additions & 5 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ AggregatedDataVariants::~AggregatedDataVariants()
destroyAggregationMethodImpl();
}

bool AggregatedDataVariants::tryMarkNeedSpill()
{
assert(!need_spill);
if (empty())
return false;
if (!isTwoLevel())
{
/// Data can only be flushed to disk if a two-level aggregation is supported.
if (!isConvertibleToTwoLevel())
return false;
convertToTwoLevel();
}
need_spill = true;
return true;
}

void AggregatedDataVariants::destroyAggregationMethodImpl()
{
if (!aggregation_method_impl)
Expand Down Expand Up @@ -719,6 +735,8 @@ bool Aggregator::executeOnBlock(
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns)
{
assert(!result.need_spill);

if (is_cancelled())
return true;

Expand Down Expand Up @@ -811,15 +829,11 @@ bool Aggregator::executeOnBlock(
result.convertToTwoLevel();

/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation is supported.
*/
if (max_bytes_before_external_group_by && result_size > 0
&& (result.isTwoLevel() || result.isConvertibleToTwoLevel())
&& result_size_bytes > max_bytes_before_external_group_by)
{
if (!result.isTwoLevel())
result.convertToTwoLevel();
spill(result);
result.tryMarkNeedSpill();
}

return true;
Expand Down Expand Up @@ -847,6 +861,7 @@ void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_dat

void Aggregator::spill(AggregatedDataVariants & data_variants)
{
assert(data_variants.need_spill);
bool init_value = false;
if (spill_triggered.compare_exchange_strong(init_value, true, std::memory_order_relaxed))
{
Expand All @@ -872,6 +887,7 @@ void Aggregator::spill(AggregatedDataVariants & data_variants)

/// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
data_variants.init(data_variants.type);
data_variants.need_spill = false;
data_variants.aggregates_pools = Arenas(1, std::make_shared<Arena>());
data_variants.aggregates_pool = data_variants.aggregates_pools.back().get();
data_variants.without_key = nullptr;
Expand Down Expand Up @@ -978,12 +994,18 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria

if (!executeOnBlock(block, result, key_columns, aggregate_columns))
break;
if (result.need_spill)
spill(result);
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
{
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns);
if (result.need_spill)
spill(result);
}

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,10 @@ struct AggregatedDataVariants : private boost::noncopyable

Type type{Type::EMPTY};

bool need_spill = false;

bool tryMarkNeedSpill();

void destroyAggregationMethodImpl();

AggregatedDataVariants()
Expand Down
Loading

0 comments on commit 3ef3154

Please sign in to comment.