Skip to content

Refine the size() calculation of accumulator #5904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 75 additions & 38 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl GroupedHashAggregateStream {
map, group_states, ..
} = &mut self.aggr_state;

let mut accumulator_set_init_size = None;
for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
Expand Down Expand Up @@ -384,13 +385,15 @@ impl GroupedHashAggregateStream {
+ (std::mem::size_of::<u32>() * group_state.indices.capacity());

// Allocation done by normal accumulators
*allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
* group_state.accumulator_set.capacity())
+ group_state
.accumulator_set
.iter()
.map(|accu| accu.size())
.sum::<usize>();
*allocated += *accumulator_set_init_size.get_or_insert_with(|| {
std::mem::size_of::<Box<dyn Accumulator>>()
* group_state.accumulator_set.capacity()
+ group_state
.accumulator_set
.iter()
.map(|accu| accu.size())
.sum::<usize>()
});

// for hasher function, use precomputed hash value
map.insert_accounted(
Expand All @@ -410,16 +413,24 @@ impl GroupedHashAggregateStream {

// Update the accumulator results, according to row_aggr_state.
#[allow(clippy::too_many_arguments)]
fn update_accumulators(
fn update_accumulators<F1, F2>(
&mut self,
groups_with_rows: &[usize],
offsets: &[usize],
row_values: &[Vec<ArrayRef>],
normal_values: &[Vec<ArrayRef>],
row_filter_values: &[Option<ArrayRef>],
normal_filter_values: &[Option<ArrayRef>],
func_row: F1,
func_normal: F2,
allocated: &mut usize,
) -> Result<()> {
) -> Result<()>
where
F1: Fn(&mut RowAccumulatorItem, &mut RowAccessor, &[ArrayRef]) -> Result<()>,
F2: Fn(&mut AccumulatorItem, &[ArrayRef]) -> Result<()>,
{
let accumulator_set_pre =
get_accumulator_set_size(groups_with_rows, &self.aggr_state.group_states);
// 2.1 for each key in this batch
// 2.2 for each aggregation
// 2.3 `slice` from each of its arrays the keys' values
Expand All @@ -446,15 +457,7 @@ impl GroupedHashAggregateStream {
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor
.point_to(0, group_state.aggregation_buffer.as_mut_slice());
match self.mode {
AggregateMode::Partial => {
accumulator.update_batch(&values, &mut state_accessor)
}
AggregateMode::FinalPartitioned | AggregateMode::Final => {
// note: the aggregation here is over states, not values, thus the merge
accumulator.merge_batch(&values, &mut state_accessor)
}
}
func_row(accumulator, &mut state_accessor, &values)
})?;
// normal accumulators
group_state
Expand All @@ -468,24 +471,17 @@ impl GroupedHashAggregateStream {
filter_opt.as_ref(),
offsets,
)?;
let size_pre = accumulator.size();
let res = match self.mode {
AggregateMode::Partial => accumulator.update_batch(&values),
AggregateMode::FinalPartitioned | AggregateMode::Final => {
// note: the aggregation here is over states, not values, thus the merge
accumulator.merge_batch(&values)
}
};
let size_post = accumulator.size();
*allocated += size_post.saturating_sub(size_pre);
res
func_normal(accumulator, &values)
})
// 2.5
.and({
group_state.indices.clear();
Ok(())
})
})?;
let accumulator_set_post =
get_accumulator_set_size(groups_with_rows, &self.aggr_state.group_states);
*allocated += accumulator_set_post.saturating_sub(accumulator_set_pre);
Ok(())
}

Expand Down Expand Up @@ -534,15 +530,43 @@ impl GroupedHashAggregateStream {
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
self.update_accumulators(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
&mut allocated,
)?;
match self.mode {
AggregateMode::Partial => self.update_accumulators(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
|accumulator: &mut RowAccumulatorItem,
state_accessor: &mut RowAccessor,
values: &[ArrayRef]| {
accumulator.update_batch(values, state_accessor)
},
|accumulator: &mut AccumulatorItem, values: &[ArrayRef]| {
accumulator.update_batch(values)
},
&mut allocated,
)?,
AggregateMode::FinalPartitioned | AggregateMode::Final => self
.update_accumulators(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
|accumulator: &mut RowAccumulatorItem,
state_accessor: &mut RowAccessor,
values: &[ArrayRef]| {
accumulator.merge_batch(values, state_accessor)
},
|accumulator: &mut AccumulatorItem, values: &[ArrayRef]| {
accumulator.merge_batch(values)
},
&mut allocated,
)?,
};
}
allocated += self
.row_converter
Expand All @@ -552,6 +576,19 @@ impl GroupedHashAggregateStream {
}
}

fn get_accumulator_set_size(
groups_with_rows: &[usize],
group_states: &[GroupState],
) -> usize {
groups_with_rows.iter().fold(0usize, |acc, group_idx| {
let group_state = &group_states[*group_idx];
group_state
.accumulator_set
.iter()
.fold(acc, |acc, accumulator| acc + accumulator.size())
})
}

/// The state that is built for each output group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic is quite complex to collect the memory size of the accumulators, maybe the computation is more than the real useful aggregations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree the calculation for the memory size is complicated.

I think @tustvold has been thinking about how to improve performance in this area, but I am not sure how far he has gotten

In general, managing individual allocations (and then accounting for their sizes) for each group is a significant additional overhead for grouping.

#[derive(Debug)]
pub struct GroupState {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl Accumulator for AvgAccumulator {
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
std::mem::size_of_val(self)
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl Accumulator for SumAccumulator {
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
std::mem::size_of_val(self)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this size change after the AvgAccumulator /SumAccumulator struct is initialized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest rename the size method to size_in_bytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not. Option is of Enum type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the flamegraph, it seems it's not a bottleneck now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs

https://github.com/yahoNanJing/arrow-datafusion/blob/issue-5903/datafusion/expr/src/accumulator.rs#L88

    /// Allocated size required for this accumulator, in bytes, including `Self`.
    /// Allocated means that for internal containers such as `Vec`, the `capacity` should be used
    /// not the `len`
    fn size(&self) -> usize;

The change in this PR seems to avoid extra allocations in ScalarValue (such as ScalarValue::Utf8 which has an allocated string in it)

}

Expand Down