From 1392807c0ae9aeadcd2177a08406a0c35a8f20a1 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Sun, 23 May 2021 21:57:55 -0500 Subject: [PATCH] Support Stream Pre Agg (#47) --- be/src/vec/common/hash_table/hash_table.h | 3 + be/src/vec/exec/vaggregation_node.cpp | 241 +++++++++++++++++++++- be/src/vec/exec/vaggregation_node.h | 14 ++ 3 files changed, 256 insertions(+), 2 deletions(-) diff --git a/be/src/vec/common/hash_table/hash_table.h b/be/src/vec/common/hash_table/hash_table.h index 30f7218fb5ea8b..a3f032e16ea566 100644 --- a/be/src/vec/common/hash_table/hash_table.h +++ b/be/src/vec/common/hash_table/hash_table.h @@ -939,6 +939,9 @@ class HashTable : private boost::noncopyable, size_t getBufferSizeInCells() const { return grower.bufSize(); } + bool add_elem_size_overflow(size_t add_size) const { + return grower.overflow(add_size + m_size); + } #ifdef DBMS_HASH_MAP_COUNT_COLLISIONS size_t getCollisions() const { return collisions; } #endif diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index a57a01c4af5026..ce5f8c4a053d2c 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -32,6 +32,48 @@ #include "vec/utils/util.hpp" namespace doris::vectorized { + + /// The minimum reduction factor (input rows divided by output rows) to grow hash tables +/// in a streaming preaggregation, given that the hash tables are currently the given +/// size or above. The sizes roughly correspond to hash table sizes where the bucket +/// arrays will fit in a cache level. Intuitively, we don't want the working set of the +/// aggregation to expand to the next level of cache unless we're reducing the input +/// enough to outweigh the increased memory latency we'll incur for each hash table +/// lookup. +/// +/// Note that the current reduction achieved is not always a good estimate of the +/// final reduction. It may be biased either way depending on the ordering of the +/// input. If the input order is random, we will underestimate the final reduction +/// factor because the probability of a row having the same key as a previous row +/// increases as more input is processed. If the input order is correlated with the +/// key, skew may bias the estimate. If high cardinality keys appear first, we +/// may overestimate and if low cardinality keys appear first, we underestimate. +/// To estimate the eventual reduction achieved, we estimate the final reduction +/// using the planner's estimated input cardinality and the assumption that input +/// is in a random order. This means that we assume that the reduction factor will +/// increase over time. +struct StreamingHtMinReductionEntry { + // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in + // bytes is greater than this threshold. + int min_ht_mem; + // The minimum reduction factor to expand the hash tables. + double streaming_ht_min_reduction; +}; + +// TODO: experimentally tune these values and also programmatically get the cache size +// of the machine that we're running on. +static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { + // Expand up to L2 cache always. + {0, 0.0}, + // Expand into L3 cache if we look like we're getting some reduction. + {256 * 1024, 1.1}, + // Expand into main memory if we're getting a significant reduction. + {2 * 1024 * 1024, 2.0}, +}; + +static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = + sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); + AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), @@ -44,7 +86,18 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, _agg_data(), _build_timer(nullptr), _exec_timer(nullptr), - _merge_timer(nullptr) {} + _merge_timer(nullptr) { + if (tnode.agg_node.__isset.use_streaming_preaggregation) { + _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; + if (_is_streaming_preagg) { + DCHECK(_conjunct_ctxs.empty()) << "Preaggs have no conjuncts"; + DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; + DCHECK(_limit == -1) << "Preaggs have no limits"; + } + } else { + _is_streaming_preagg = false; + } +} AggregationNode::~AggregationNode() = default; @@ -159,6 +212,12 @@ Status AggregationNode::prepare(RuntimeState* state) { this, std::placeholders::_1); } + if (_is_streaming_preagg) { + _executor.pre_agg = std::bind(&AggregationNode::_pre_agg_with_serialized_key, this, + std::placeholders::_1, std::placeholders::_2); + _max_size_of_stream_pre_agg_buffer = state->batch_size(); + } + if (_needs_finalize) { _executor.get_result = std::bind( &AggregationNode::_get_with_serialized_key_result, this, std::placeholders::_1, @@ -186,8 +245,10 @@ Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(_children[0]->open(state)); - bool eos = false; + // Streaming preaggregations do all processing in GetNext(). + if (_is_streaming_preagg) return Status::OK(); + bool eos = false; while (!eos) { Block block; RETURN_IF_CANCELLED(state); @@ -208,7 +269,26 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); block->clear(); + + if (_is_streaming_preagg) { + bool child_eos = false; + + Block in_block; + RETURN_IF_CANCELLED(state); + do { + RETURN_IF_ERROR(_children[0]->get_next(state, &in_block, &child_eos)); + } while(in_block.rows() == 0 && !child_eos); + + if (in_block.rows() != 0) { + RETURN_IF_ERROR(_executor.pre_agg(&in_block, block)); + return Status::OK(); + } else { + return _executor.get_result(state, block, eos); + } + } + RETURN_IF_ERROR(_executor.get_result(state, block, eos)); + // dispose the having clause, should not be execute in prestreaming agg if (_vconjunct_ctx_ptr) { int result_column_id = -1; int orig_columns = block->columns(); @@ -360,6 +440,163 @@ void AggregationNode::_close_without_key() { _destory_agg_status(_agg_data.without_key); } +bool AggregationNode::_should_expand_preagg_hash_tables() { + if (!_should_expand_hash_table) return false; + + auto& hash_tbl = _agg_data.serialized->data; + auto [ht_mem, ht_rows] = std::pair{hash_tbl.getBufferSizeInBytes(), hash_tbl.size()}; + + // Need some rows in tables to have valid statistics. + if (ht_rows == 0) return true; + + // Find the appropriate reduction factor in our table for the current hash table sizes. + int cache_level = 0; + while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && + ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { + ++cache_level; + } + + // Compare the number of rows in the hash table with the number of input rows that + // were aggregated into it. Exclude passed through rows from this calculation since + // they were not in hash tables. + const int64_t input_rows = _children[0]->rows_returned(); + const int64_t aggregated_input_rows = input_rows - _num_rows_returned; + // TODO chenhao + // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; + double current_reduction = static_cast(aggregated_input_rows) / ht_rows; + + // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be + // inaccurate, which could lead to a divide by zero below. + if (aggregated_input_rows <= 0) return true; + + // Extrapolate the current reduction factor (r) using the formula + // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data + // set, N is the number of input rows, excluding passed-through rows, and n is the + // number of rows inserted or merged into the hash tables. This is a very rough + // approximation but is good enough to be useful. + // TODO: consider collecting more statistics to better estimate reduction. + // double estimated_reduction = aggregated_input_rows >= expected_input_rows + // ? current_reduction + // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); + double min_reduction = STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; + + // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); +// COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); + // return estimated_reduction > min_reduction; + _should_expand_hash_table = current_reduction > min_reduction; + return _should_expand_hash_table; +} + +Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block *in_block, + doris::vectorized::Block *out_block) { + DCHECK(!_probe_expr_ctxs.empty()); + // now we only support serialized key + // TODO: + DCHECK(_agg_data.serialized != nullptr); + + using Method = AggregationMethodSerialized; + using AggState = Method::State; + + auto& method = *_agg_data.serialized; + + size_t key_size = _probe_expr_ctxs.size(); + ColumnRawPtrs key_columns(key_size); + + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id)); + key_columns[i] = in_block->getByPosition(result_column_id).column.get(); + } + + int rows = in_block->rows(); + PODArray places(rows); + AggState state(key_columns, {}, nullptr); + + // Stop expanding hash tables if we're not reducing the input sufficiently. As our + // hash tables expand out of each level of cache hierarchy, every hash table lookup + // will take longer. We also may not be able to expand hash tables because of memory + // pressure. In either case we should always use the remaining space in the hash table + // to avoid wasting memory. + if (auto& hash_tbl = _agg_data.serialized->data; hash_tbl.add_elem_size_overflow(rows)) { + // do not try to do agg, just init and serialize directly return the out_block + if (!_should_expand_preagg_hash_tables()) { + if (_streaming_pre_agg_buffer == nullptr) { + _streaming_pre_agg_buffer = _agg_arena_pool.alignedAlloc( + _total_size_of_aggregate_states * _max_size_of_stream_pre_agg_buffer, _align_aggregate_states); + } + + auto aggregate_data = _streaming_pre_agg_buffer; + for (size_t i = 0; i < rows; ++i) { + _create_agg_status(aggregate_data); + places[i] = aggregate_data; + aggregate_data += _total_size_of_aggregate_states; + } + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], + places.data(), &_agg_arena_pool); + } + + out_block->clear(); + auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc()); + + MutableColumns value_columns; + for (int i = key_size; i < column_withschema.size(); ++i) { + value_columns.emplace_back(column_withschema[i].type->createColumn()); + } + aggregate_data = _streaming_pre_agg_buffer; + for (size_t j = 0; j < rows; ++j) { + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->insert_result_info(aggregate_data + _offsets_of_aggregate_states[i], + value_columns[i].get()); + } + aggregate_data += _total_size_of_aggregate_states; + } + + *out_block = column_withschema; + MutableColumns columns(out_block->columns()); + for (int i = 0; i < out_block->columns(); ++i) { + if (i < key_size) { + columns[i] = std::move(*key_columns[i]).mutate(); + } else { + columns[i] = std::move(value_columns[i - key_size]); + } + } + out_block->setColumns(std::move(columns)); + return Status::OK(); + } + } + + /// For all rows. + for (size_t i = 0; i < rows; ++i) { + AggregateDataPtr aggregate_data = nullptr; + + auto emplace_result = state.emplaceKey(method.data, i, _agg_arena_pool); + + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.isInserted()) { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.setMapped(nullptr); + + aggregate_data = _agg_arena_pool.alignedAlloc(_total_size_of_aggregate_states, + _align_aggregate_states); + _create_agg_status(aggregate_data); + + emplace_result.setMapped(aggregate_data); + } else + aggregate_data = emplace_result.getMapped(); + + places[i] = aggregate_data; + assert(places[i] != nullptr); + } + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], + places.data(), &_agg_arena_pool); + } + + return Status::OK(); +} + Status AggregationNode::_execute_with_serialized_key(Block* block) { DCHECK(!_probe_expr_ctxs.empty()); // now we only support serialized key diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index f5d6f9f508ea63..09012921bf82e6 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -151,7 +151,18 @@ class AggregationNode : public ::doris::ExecNode { RuntimeProfile::Counter* _exec_timer; RuntimeProfile::Counter* _merge_timer; + bool _is_streaming_preagg; + bool _should_expand_hash_table = true; + char* _streaming_pre_agg_buffer = nullptr; + size_t _max_size_of_stream_pre_agg_buffer = 0; + + /// Expose the minimum reduction factor to continue growing the hash tables. + RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_; private: + /// Return true if we should keep expanding hash tables in the preagg. If false, + /// the preagg should pass through any rows it can't fit in its tables. + bool _should_expand_preagg_hash_tables(); + Status _create_agg_status(AggregateDataPtr data); Status _destory_agg_status(AggregateDataPtr data); @@ -163,17 +174,20 @@ class AggregationNode : public ::doris::ExecNode { Status _get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos); Status _serialize_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos); + Status _pre_agg_with_serialized_key(Block* in_block, Block* out_block); Status _execute_with_serialized_key(Block* block); Status _merge_with_serialized_key(Block* block); void _close_with_serialized_key(); using vectorized_execute = std::function; + using vectorized_pre_agg = std::function; using vectorized_get_result = std::function; using vectorized_closer = std::function; struct executor { vectorized_execute execute; + vectorized_pre_agg pre_agg; vectorized_get_result get_result; vectorized_closer close; };