Skip to content


Support Stream Pre Agg (apache#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Jul 13, 2021
1 parent 885ab0c commit 1392807
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 2 deletions.
3 changes: 3 additions & 0 deletions be/src/vec/common/hash_table/hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,9 @@ class HashTable : private boost::noncopyable,

size_t getBufferSizeInCells() const { return grower.bufSize(); }

bool add_elem_size_overflow(size_t add_size) const {
return grower.overflow(add_size + m_size);
size_t getCollisions() const { return collisions; }
Expand Down
241 changes: 239 additions & 2 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@
#include "vec/utils/util.hpp"

namespace doris::vectorized {

/// The minimum reduction factor (input rows divided by output rows) to grow hash tables
/// in a streaming preaggregation, given that the hash tables are currently the given
/// size or above. The sizes roughly correspond to hash table sizes where the bucket
/// arrays will fit in a cache level. Intuitively, we don't want the working set of the
/// aggregation to expand to the next level of cache unless we're reducing the input
/// enough to outweigh the increased memory latency we'll incur for each hash table
/// lookup.
/// Note that the current reduction achieved is not always a good estimate of the
/// final reduction. It may be biased either way depending on the ordering of the
/// input. If the input order is random, we will underestimate the final reduction
/// factor because the probability of a row having the same key as a previous row
/// increases as more input is processed. If the input order is correlated with the
/// key, skew may bias the estimate. If high cardinality keys appear first, we
/// may overestimate and if low cardinality keys appear first, we underestimate.
/// To estimate the eventual reduction achieved, we estimate the final reduction
/// using the planner's estimated input cardinality and the assumption that input
/// is in a random order. This means that we assume that the reduction factor will
/// increase over time.
struct StreamingHtMinReductionEntry {
// Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
// bytes is greater than this threshold.
int min_ht_mem;
// The minimum reduction factor to expand the hash tables.
double streaming_ht_min_reduction;

// TODO: experimentally tune these values and also programmatically get the cache size
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
{0, 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
{256 * 1024, 1.1},
// Expand into main memory if we're getting a significant reduction.
{2 * 1024 * 1024, 2.0},

static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =

AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
Expand All @@ -44,7 +86,18 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
_merge_timer(nullptr) {}
_merge_timer(nullptr) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
DCHECK(_conjunct_ctxs.empty()) << "Preaggs have no conjuncts";
DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
DCHECK(_limit == -1) << "Preaggs have no limits";
} else {
_is_streaming_preagg = false;

AggregationNode::~AggregationNode() = default;

Expand Down Expand Up @@ -159,6 +212,12 @@ Status AggregationNode::prepare(RuntimeState* state) {
this, std::placeholders::_1);

if (_is_streaming_preagg) {
_executor.pre_agg = std::bind<Status>(&AggregationNode::_pre_agg_with_serialized_key, this,
std::placeholders::_1, std::placeholders::_2);
_max_size_of_stream_pre_agg_buffer = state->batch_size();

if (_needs_finalize) {
_executor.get_result = std::bind<Status>(
&AggregationNode::_get_with_serialized_key_result, this, std::placeholders::_1,
Expand Down Expand Up @@ -186,8 +245,10 @@ Status AggregationNode::open(RuntimeState* state) {


bool eos = false;
// Streaming preaggregations do all processing in GetNext().
if (_is_streaming_preagg) return Status::OK();

bool eos = false;
while (!eos) {
Block block;
Expand All @@ -208,7 +269,26 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {

if (_is_streaming_preagg) {
bool child_eos = false;

Block in_block;
do {
RETURN_IF_ERROR(_children[0]->get_next(state, &in_block, &child_eos));
} while(in_block.rows() == 0 && !child_eos);

if (in_block.rows() != 0) {
RETURN_IF_ERROR(_executor.pre_agg(&in_block, block));
return Status::OK();
} else {
return _executor.get_result(state, block, eos);

RETURN_IF_ERROR(_executor.get_result(state, block, eos));
// dispose the having clause, should not be execute in prestreaming agg
if (_vconjunct_ctx_ptr) {
int result_column_id = -1;
int orig_columns = block->columns();
Expand Down Expand Up @@ -360,6 +440,163 @@ void AggregationNode::_close_without_key() {

bool AggregationNode::_should_expand_preagg_hash_tables() {
if (!_should_expand_hash_table) return false;

auto& hash_tbl = _agg_data.serialized->data;
auto [ht_mem, ht_rows] = std::pair{hash_tbl.getBufferSizeInBytes(), hash_tbl.size()};

// Need some rows in tables to have valid statistics.
if (ht_rows == 0) return true;

// Find the appropriate reduction factor in our table for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {

// Compare the number of rows in the hash table with the number of input rows that
// were aggregated into it. Exclude passed through rows from this calculation since
// they were not in hash tables.
const int64_t input_rows = _children[0]->rows_returned();
const int64_t aggregated_input_rows = input_rows - _num_rows_returned;
// TODO chenhao
// const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows;

// TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
// inaccurate, which could lead to a divide by zero below.
if (aggregated_input_rows <= 0) return true;

// Extrapolate the current reduction factor (r) using the formula
// R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
// set, N is the number of input rows, excluding passed-through rows, and n is the
// number of rows inserted or merged into the hash tables. This is a very rough
// approximation but is good enough to be useful.
// TODO: consider collecting more statistics to better estimate reduction.
// double estimated_reduction = aggregated_input_rows >= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
double min_reduction = STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;

// COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
// return estimated_reduction > min_reduction;
_should_expand_hash_table = current_reduction > min_reduction;
return _should_expand_hash_table;

Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block *in_block,
doris::vectorized::Block *out_block) {
// now we only support serialized key
// TODO:
DCHECK(_agg_data.serialized != nullptr);

using Method = AggregationMethodSerialized<AggregatedDataWithStringKey>;
using AggState = Method::State;

auto& method = *_agg_data.serialized;

size_t key_size = _probe_expr_ctxs.size();
ColumnRawPtrs key_columns(key_size);

for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id));
key_columns[i] = in_block->getByPosition(result_column_id).column.get();

int rows = in_block->rows();
PODArray<AggregateDataPtr> places(rows);
AggState state(key_columns, {}, nullptr);

// Stop expanding hash tables if we're not reducing the input sufficiently. As our
// hash tables expand out of each level of cache hierarchy, every hash table lookup
// will take longer. We also may not be able to expand hash tables because of memory
// pressure. In either case we should always use the remaining space in the hash table
// to avoid wasting memory.
if (auto& hash_tbl = _agg_data.serialized->data; hash_tbl.add_elem_size_overflow(rows)) {
// do not try to do agg, just init and serialize directly return the out_block
if (!_should_expand_preagg_hash_tables()) {
if (_streaming_pre_agg_buffer == nullptr) {
_streaming_pre_agg_buffer = _agg_arena_pool.alignedAlloc(
_total_size_of_aggregate_states * _max_size_of_stream_pre_agg_buffer, _align_aggregate_states);

auto aggregate_data = _streaming_pre_agg_buffer;
for (size_t i = 0; i < rows; ++i) {
places[i] = aggregate_data;
aggregate_data += _total_size_of_aggregate_states;
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i],, &_agg_arena_pool);

auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc());

MutableColumns value_columns;
for (int i = key_size; i < column_withschema.size(); ++i) {
aggregate_data = _streaming_pre_agg_buffer;
for (size_t j = 0; j < rows; ++j) {
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->insert_result_info(aggregate_data + _offsets_of_aggregate_states[i],
aggregate_data += _total_size_of_aggregate_states;

*out_block = column_withschema;
MutableColumns columns(out_block->columns());
for (int i = 0; i < out_block->columns(); ++i) {
if (i < key_size) {
columns[i] = std::move(*key_columns[i]).mutate();
} else {
columns[i] = std::move(value_columns[i - key_size]);
return Status::OK();

/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;

auto emplace_result = state.emplaceKey(, i, _agg_arena_pool);

/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted()) {
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.

aggregate_data = _agg_arena_pool.alignedAlloc(_total_size_of_aggregate_states,

} else
aggregate_data = emplace_result.getMapped();

places[i] = aggregate_data;
assert(places[i] != nullptr);

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i],, &_agg_arena_pool);

return Status::OK();

Status AggregationNode::_execute_with_serialized_key(Block* block) {
// now we only support serialized key
Expand Down
14 changes: 14 additions & 0 deletions be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,18 @@ class AggregationNode : public ::doris::ExecNode {
RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _merge_timer;

bool _is_streaming_preagg;
bool _should_expand_hash_table = true;
char* _streaming_pre_agg_buffer = nullptr;
size_t _max_size_of_stream_pre_agg_buffer = 0;

/// Expose the minimum reduction factor to continue growing the hash tables.
RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
/// Return true if we should keep expanding hash tables in the preagg. If false,
/// the preagg should pass through any rows it can't fit in its tables.
bool _should_expand_preagg_hash_tables();

Status _create_agg_status(AggregateDataPtr data);
Status _destory_agg_status(AggregateDataPtr data);

Expand All @@ -163,17 +174,20 @@ class AggregationNode : public ::doris::ExecNode {

Status _get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos);
Status _serialize_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos);
Status _pre_agg_with_serialized_key(Block* in_block, Block* out_block);
Status _execute_with_serialized_key(Block* block);
Status _merge_with_serialized_key(Block* block);
void _close_with_serialized_key();

using vectorized_execute = std::function<Status(Block* block)>;
using vectorized_pre_agg = std::function<Status(Block* in_block, Block* out_block)>;
using vectorized_get_result =
std::function<Status(RuntimeState* state, Block* block, bool* eos)>;
using vectorized_closer = std::function<void()>;

struct executor {
vectorized_execute execute;
vectorized_pre_agg pre_agg;
vectorized_get_result get_result;
vectorized_closer close;
Expand Down

0 comments on commit 1392807

Please sign in to comment.