Skip to content

Commit

Permalink
fix clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Apr 11, 2023
1 parent 679a0d0 commit b78e275
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,24 +335,24 @@ impl GroupedHashAggregateStream {
create_hashes(group_values, &self.random_state, &mut batch_hashes)?;

let AggregationState {
map: row_map,
group_states: row_group_states,
map,
group_states,
..
} = &mut self.aggr_state;

for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
let group_state = &row_group_states[*group_idx];
let group_state = &group_states[*group_idx];
group_rows.row(row) == group_state.group_by_values.row()
});

match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
let group_state = &mut row_group_states[*group_idx];
let group_state = &mut group_states[*group_idx];

// 1.3
if group_state.indices.is_empty() {
Expand All @@ -375,7 +375,7 @@ impl GroupedHashAggregateStream {
accumulator_set,
indices: vec![row as u32], // 1.3
};
let group_idx = row_group_states.len();
let group_idx = group_states.len();

// NOTE: do NOT include the `GroupState` struct size in here because this is captured by
// `group_states` (see allocation down below)
Expand All @@ -395,13 +395,13 @@ impl GroupedHashAggregateStream {
.sum::<usize>();

// for hasher function, use precomputed hash value
row_map.insert_accounted(
map.insert_accounted(
(hash, group_idx),
|(hash, _group_index)| *hash,
allocated,
);

row_group_states.push_accounted(group_state, allocated);
group_states.push_accounted(group_state, allocated);

groups_with_rows.push(group_idx);
}
Expand All @@ -411,6 +411,7 @@ impl GroupedHashAggregateStream {
}

// Update the accumulator results, according to row_aggr_state.
#[allow(clippy::too_many_arguments)]
fn update_accumulators(
&mut self,
groups_with_rows: &[usize],
Expand Down Expand Up @@ -668,17 +669,17 @@ impl GroupedHashAggregateStream {
for (field_idx, field) in output_fields[start..end].iter().enumerate() {
let current = match self.mode {
AggregateMode::Partial => ScalarValue::iter_to_array(
group_state_chunk.iter().map(|row_group_state| {
row_group_state.accumulator_set[idx]
group_state_chunk.iter().map(|group_state| {
group_state.accumulator_set[idx]
.state()
.map(|v| v[field_idx].clone())
.expect("Unexpected accumulator state in hash aggregate")
}),
),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
ScalarValue::iter_to_array(group_state_chunk.iter().map(
|row_group_state| {
row_group_state.accumulator_set[idx].evaluate().expect(
|group_state| {
group_state.accumulator_set[idx].evaluate().expect(
"Unexpected accumulator state in hash aggregate",
)
},
Expand Down

0 comments on commit b78e275

Please sign in to comment.