Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently, the aggregate code keeps the states of each group by value, by storing it in a Vec
together with the group by values.
For low-cardinality aggregates, this works OK, as there are only a limited number of keys and thus accumulators.
But for medium/high cardinality aggregate, the current way is inefficient:
- Higher memory usage, as each group by value has an extra
Vec
with a number ofAccumulatorItem
, which adds at least 2 (
Box<dyn T>
) + 3 (emptyVec
) + 3 (initial ) = 8 pointers (64 bytes) of overhead per item when storing one aggregate per group (e.g. one of count/avg/sum). - Extra allocations while inserting new groups into the data structures and when returning states as
Vec<ScalarValue>
, and some cloning. - Less cache efficient (because memory is scattered around)
- Requires more / expensive paths to convert into
Vec
andScalarValue
s and back to anArray
s again.
This issue is for the Accumulator
state only, but a similar thing could be done for the group_by_values
with a similar
Describe the solution you'd like
We should define a trait that allows storing the required state in the accumulators in a contiguous/columnar manner.
The idea here is that the required state can be stored in a Vec
-like container, where each item at the index contains the current state.
My proposal is to add an extra index
to the methods, so the Accumulators can update the state at a certain index
.
Some methods could be added to retrieve the entire state of the accumulator and/or convert the state values to array(s) in one go. If Arrow provides mutable Arrays in the future, it could avoid the extra conversion step back to an Array
.
pub trait Accumulator: Send + Sync + Debug {
/// Initializes the state for a new group with a `index`
fn init_state(&self, index: usize);
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function should return a vector
// of two values, sum and n.
fn state(&self, index: usize) -> Result<Vec<ScalarValue>>;
/// updates the accumulator's state from a vector of scalars.
fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()>;
/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
};
(0..values[0].len()).try_for_each(|idx| {
let v = values
.iter()
.map(|array| ScalarValue::try_from_array(array, idx))
.collect::<Result<Vec<_>>>()?;
self.update(index, &v)
})
}
/// updates the accumulator's state from a vector of scalars.
fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()>;
/// updates the accumulator's state from a vector of states.
fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
};
(0..states[0].len()).try_for_each(|idx| {
let v = states
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.merge(index, &v)
})
}
/// returns its value based on its current state.
fn evaluate(&self, index: usize) -> Result<ScalarValue>;
}
For Count
this would be changed, most notably the change to store the state as Vec<u64>
:
#[derive(Debug)]
struct CountAccumulator {
- count: u64,
+ count: Vec<u64>,
}
impl CountAccumulator {
/// new count accumulator
pub fn new() -> Self {
- Self { count: 0 }
+ Self { count: vec![] }
}
}
impl Accumulator for CountAccumulator {
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ fn init_state(&self, index: usize) {
+ assert_eq!(self.count.len(), index);
+ self.count.push(0);
+ }
+ fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> Result<()> {
let array = &values[0];
- self.count += (array.len() - array.data().null_count()) as u64;
+ self.count[index] += (array.len() - array.data().null_count()) as u64;
Ok(())
}
- fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+ fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()> {
let value = &values[0];
if !value.is_null() {
- self.count += 1;
+ self.count[index] += 1;
}
Ok(())
}
- fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
+ fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()> {
let count = &states[0];
if let ScalarValue::UInt64(Some(delta)) = count {
- self.count += *delta;
+ self.count[index] += *delta;
} else {
unreachable!()
}
Ok(())
}
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> Result<()> {
let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
let delta = &compute::sum(counts);
if let Some(d) = delta {
- self.count += *d;
+ self.count[index] += *d;
}
Ok(())
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![ScalarValue::UInt64(Some(self.count))])
+ fn state(&self, index: usize) -> Result<Vec<ScalarValue>> {
+ Ok(vec![ScalarValue::UInt64(Some(self.count[index]))])
}
- fn evaluate(&self) -> Result<ScalarValue> {
- Ok(ScalarValue::UInt64(Some(self.count)))
+ fn evaluate(&self, index: usize) -> Result<ScalarValue> {
+ Ok(ScalarValue::UInt64(Some(self.count[index])))
}
}
And this would be changed in the datastructure of hash aggregates (moving the state out of group state, to Accumulators
:
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -558,9 +558,6 @@ struct GroupState {
/// The actual group by values, one for each group column
group_by_values: Box<[ScalarValue]>,
- // Accumulator state, one for each aggregate
- accumulator_set: Vec<AccumulatorItem>,
-
/// scratch space used to collect indices for input rows in a
/// bach that have values to aggregate. Reset on each batch
indices: Vec<u32>,
@@ -578,6 +575,9 @@ struct Accumulators {
/// values: (hash, index into `group_states`)
map: RawTable<(u64, usize)>,
+ // Accumulator state, keeps state of each group state
+ accumulators: Vec<AccumulatorItem>,
+
/// State for each group
group_states: Vec<GroupState>,
}
Describe alternatives you've considered
Additional context