Skip to content

Commit

Permalink
[Enhancement] Try catch the memory alloc of Aggregator::compute_batch…
Browse files Browse the repository at this point in the history
…_agg_states (StarRocks#55382)

Signed-off-by: trueeyu <lxhhust350@qq.com>
  • Loading branch information
trueeyu authored Feb 6, 2025
1 parent 52691d3 commit 0417065
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 31 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ Status AggregateBlockingNode::open(RuntimeState* state) {
size_t chunk_size = chunk->num_rows();
{
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_ALLOC_SCOPE_START()
if (!_aggregator->is_none_group_by_exprs()) {
TRY_CATCH_ALLOC_SCOPE_START()
_aggregator->build_hash_map(chunk_size, agg_group_by_with_limit);

_aggregator->try_convert_to_two_level_map();
TRY_CATCH_ALLOC_SCOPE_END()
}
if (_aggregator->is_none_group_by_exprs()) {
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(chunk.get(), chunk_size));
Expand All @@ -108,6 +107,7 @@ Status AggregateBlockingNode::open(RuntimeState* state) {
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
}
}
TRY_CATCH_ALLOC_SCOPE_END()

_aggregator->update_num_input_rows(chunk_size);
}
Expand Down
16 changes: 11 additions & 5 deletions be/src/exec/aggregate/aggregate_streaming_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ Status AggregateStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bo
TStreamingPreaggregationMode::FORCE_PREAGGREGATION) {
RETURN_IF_ERROR(state->check_mem_limit("AggrNode"));
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(input_chunk_size));
TRY_CATCH_ALLOC_SCOPE_START()
_aggregator->build_hash_map(input_chunk_size);
if (_aggregator->is_none_group_by_exprs()) {
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(input_chunk.get(), input_chunk_size));
} else {
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(input_chunk.get(), input_chunk_size));
}

TRY_CATCH_BAD_ALLOC(_aggregator->try_convert_to_two_level_map());
_aggregator->try_convert_to_two_level_map();
TRY_CATCH_ALLOC_SCOPE_END()

COUNTER_SET(_aggregator->hash_table_size(), (int64_t)_aggregator->hash_map_variant().size());

Expand All @@ -117,21 +119,24 @@ Status AggregateStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bo
RETURN_IF_ERROR(state->check_mem_limit("AggrNode"));
// hash table is not full or allow expand the hash table according reduction rate
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(input_chunk_size));
TRY_CATCH_ALLOC_SCOPE_START()
_aggregator->build_hash_map(input_chunk_size);
if (_aggregator->is_none_group_by_exprs()) {
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(input_chunk.get(), input_chunk_size));
} else {
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(input_chunk.get(), input_chunk_size));
}

TRY_CATCH_BAD_ALLOC(_aggregator->try_convert_to_two_level_map());
_aggregator->try_convert_to_two_level_map();
TRY_CATCH_ALLOC_SCOPE_END()
COUNTER_SET(_aggregator->hash_table_size(), (int64_t)_aggregator->hash_map_variant().size());
continue;
} else {
TRY_CATCH_ALLOC_SCOPE_START()
// TODO: direct call the function may affect the performance of some aggregated cases
{
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map_with_selection(input_chunk_size));
_aggregator->build_hash_map_with_selection(input_chunk_size);
}

size_t zero_count = SIMD::count_zero(_aggregator->streaming_selection());
Expand All @@ -153,6 +158,7 @@ Status AggregateStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bo
_aggregator->output_chunk_by_streaming_with_selection(input_chunk.get(), chunk));
}
}
TRY_CATCH_ALLOC_SCOPE_END()

COUNTER_SET(_aggregator->hash_table_size(), (int64_t)_aggregator->hash_map_variant().size());
if (*chunk != nullptr && (*chunk)->num_rows() > 0) {
Expand Down
31 changes: 17 additions & 14 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,12 +816,14 @@ Status Aggregator::compute_single_agg_state(Chunk* chunk, size_t chunk_size) {
SCOPED_THREAD_LOCAL_STATE_ALLOCATOR_SETTER(_allocator.get());
// batch call update or merge for singe stage
if (!_is_merge_funcs[i] && !use_intermediate) {
_agg_functions[i]->update_batch_single_state(_agg_fn_ctxs[i], chunk_size, _agg_input_raw_columns[i].data(),
_single_agg_state + _agg_states_offsets[i]);
_agg_functions[i]->update_batch_single_state_exception_safe(_agg_fn_ctxs[i], chunk_size,
_agg_input_raw_columns[i].data(),
_single_agg_state + _agg_states_offsets[i]);
} else {
DCHECK_GE(_agg_input_columns[i].size(), 1);
_agg_functions[i]->merge_batch_single_state(_agg_fn_ctxs[i], _single_agg_state + _agg_states_offsets[i],
_agg_input_columns[i][0].get(), 0, chunk_size);
_agg_functions[i]->merge_batch_single_state_exception_safe(_agg_fn_ctxs[i],
_single_agg_state + _agg_states_offsets[i],
_agg_input_columns[i][0].get(), 0, chunk_size);
}
}
RETURN_IF_ERROR(check_has_error());
Expand All @@ -839,12 +841,13 @@ Status Aggregator::compute_batch_agg_states(Chunk* chunk, size_t chunk_size) {
SCOPED_THREAD_LOCAL_STATE_ALLOCATOR_SETTER(_allocator.get());
// batch call update or merge
if (!_is_merge_funcs[i] && !use_intermediate) {
_agg_functions[i]->update_batch(_agg_fn_ctxs[i], chunk_size, _agg_states_offsets[i],
_agg_input_raw_columns[i].data(), _tmp_agg_states.data());
_agg_functions[i]->update_batch_exception_safe(_agg_fn_ctxs[i], chunk_size, _agg_states_offsets[i],
_agg_input_raw_columns[i].data(), _tmp_agg_states.data());
} else {
DCHECK_GE(_agg_input_columns[i].size(), 1);
_agg_functions[i]->merge_batch(_agg_fn_ctxs[i], _agg_input_columns[i][0]->size(), _agg_states_offsets[i],
_agg_input_columns[i][0].get(), _tmp_agg_states.data());
_agg_functions[i]->merge_batch_exception_safe(_agg_fn_ctxs[i], _agg_input_columns[i][0]->size(),
_agg_states_offsets[i], _agg_input_columns[i][0].get(),
_tmp_agg_states.data());
}
}
RETURN_IF_ERROR(check_has_error());
Expand All @@ -860,14 +863,14 @@ Status Aggregator::compute_batch_agg_states_with_selection(Chunk* chunk, size_t
RETURN_IF_ERROR(evaluate_agg_input_column(chunk, agg_expr_ctxs[i], i));
SCOPED_THREAD_LOCAL_STATE_ALLOCATOR_SETTER(_allocator.get());
if (!_is_merge_funcs[i] && !use_intermediate) {
_agg_functions[i]->update_batch_selectively(_agg_fn_ctxs[i], chunk_size, _agg_states_offsets[i],
_agg_input_raw_columns[i].data(), _tmp_agg_states.data(),
_streaming_selection);
_agg_functions[i]->update_batch_selectively_exception_safe(
_agg_fn_ctxs[i], chunk_size, _agg_states_offsets[i], _agg_input_raw_columns[i].data(),
_tmp_agg_states.data(), _streaming_selection);
} else {
DCHECK_GE(_agg_input_columns[i].size(), 1);
_agg_functions[i]->merge_batch_selectively(_agg_fn_ctxs[i], _agg_input_columns[i][0]->size(),
_agg_states_offsets[i], _agg_input_columns[i][0].get(),
_tmp_agg_states.data(), _streaming_selection);
_agg_functions[i]->merge_batch_selectively_exception_safe(
_agg_fn_ctxs[i], _agg_input_columns[i][0]->size(), _agg_states_offsets[i],
_agg_input_columns[i][0].get(), _tmp_agg_states.data(), _streaming_selection);
}
}
RETURN_IF_ERROR(check_has_error());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ Status AggregateBlockingSinkOperator::push_chunk(RuntimeState* state, const Chun
DCHECK_LE(chunk_size, state->chunk_size());

SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_ALLOC_SCOPE_START()
// try to build hash table if has group by keys
if (!_aggregator->is_none_group_by_exprs()) {
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(chunk_size, _shared_limit_countdown, _agg_group_by_with_limit));
TRY_CATCH_BAD_ALLOC(_aggregator->try_convert_to_two_level_map());
_aggregator->build_hash_map(chunk_size, _shared_limit_countdown, _agg_group_by_with_limit);
_aggregator->try_convert_to_two_level_map();
}

// batch compute aggregate states
Expand All @@ -120,6 +121,7 @@ Status AggregateBlockingSinkOperator::push_chunk(RuntimeState* state, const Chun
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
}
}
TRY_CATCH_ALLOC_SCOPE_END()

_aggregator->update_num_input_rows(chunk_size);
RETURN_IF_ERROR(_aggregator->check_has_error());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@ Status AggregateStreamingSinkOperator::_push_chunk_by_force_streaming(const Chun
Status AggregateStreamingSinkOperator::_push_chunk_by_force_preaggregation(const ChunkPtr& chunk,
const size_t chunk_size) {
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(chunk_size));
TRY_CATCH_ALLOC_SCOPE_START();
_aggregator->build_hash_map(chunk_size);
if (_aggregator->is_none_group_by_exprs()) {
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(chunk.get(), chunk_size));
} else {
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
}

TRY_CATCH_BAD_ALLOC(_aggregator->try_convert_to_two_level_map());
_aggregator->try_convert_to_two_level_map();
TRY_CATCH_ALLOC_SCOPE_END();

COUNTER_SET(_aggregator->hash_table_size(), (int64_t)_aggregator->hash_map_variant().size());
return Status::OK();
Expand All @@ -121,9 +123,10 @@ Status AggregateStreamingSinkOperator::_push_chunk_by_force_preaggregation(const
Status AggregateStreamingSinkOperator::_push_chunk_by_selective_preaggregation(const ChunkPtr& chunk,
const size_t chunk_size,
bool need_build) {
TRY_CATCH_ALLOC_SCOPE_START();
if (need_build) {
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map_with_selection(chunk_size));
_aggregator->build_hash_map_with_selection(chunk_size);
}

size_t zero_count = SIMD::count_zero(_aggregator->streaming_selection());
Expand Down Expand Up @@ -151,6 +154,7 @@ Status AggregateStreamingSinkOperator::_push_chunk_by_selective_preaggregation(c
_aggregator->offer_chunk_to_buffer(res);
}
}
TRY_CATCH_ALLOC_SCOPE_END();
COUNTER_SET(_aggregator->hash_table_size(), (int64_t)_aggregator->hash_map_variant().size());
return Status::OK();
}
Expand Down Expand Up @@ -182,14 +186,16 @@ Status AggregateStreamingSinkOperator::_push_chunk_by_auto(const ChunkPtr& chunk
_aggregator->hash_map_variant().size())) {
// hash table is not full or allow to expand the hash table according reduction rate
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(chunk_size));
TRY_CATCH_ALLOC_SCOPE_START()
_aggregator->build_hash_map(chunk_size);
if (_aggregator->is_none_group_by_exprs()) {
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(chunk.get(), chunk_size));
} else {
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
}

TRY_CATCH_BAD_ALLOC(_aggregator->try_convert_to_two_level_map());
_aggregator->try_convert_to_two_level_map();
TRY_CATCH_ALLOC_SCOPE_END()

COUNTER_SET(_aggregator->hash_table_size(), (int64_t)_aggregator->hash_map_variant().size());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,21 @@ Status SpillableAggregateBlockingSinkOperator::_try_to_spill_by_auto(RuntimeStat
return _spill_all_data(state, true);
} else if (build_hash_table) {
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(chunk_size));
TRY_CATCH_BAD_ALLOC(_aggregator->try_convert_to_two_level_map());
TRY_CATCH_ALLOC_SCOPE_START()
_aggregator->build_hash_map(chunk_size);
_aggregator->try_convert_to_two_level_map();
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
TRY_CATCH_ALLOC_SCOPE_END()

_aggregator->update_num_input_rows(chunk_size);
RETURN_IF_ERROR(_aggregator->check_has_error());
_continuous_low_reduction_chunk_num = 0;
} else {
TRY_CATCH_ALLOC_SCOPE_START()
// selective preaggregation
{
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map_with_selection(chunk_size));
_aggregator->build_hash_map_with_selection(chunk_size);
}

size_t hit_count = SIMD::count_zero(_aggregator->streaming_selection());
Expand Down Expand Up @@ -242,6 +245,7 @@ Status SpillableAggregateBlockingSinkOperator::_try_to_spill_by_auto(RuntimeStat
}

_aggregator->update_num_input_rows(hit_count);
TRY_CATCH_ALLOC_SCOPE_END()
RETURN_IF_ERROR(_aggregator->check_has_error());
}

Expand Down

0 comments on commit 0417065

Please sign in to comment.