Skip to content

Commit af3f62b

Browse files
committed
Allow better vectorization in accumulate functions
1 parent e0cc8c8 commit af3f62b

File tree

1 file changed

+38
-5
lines changed
  • datafusion/physical-expr/src/aggregate/groups_accumulator

1 file changed

+38
-5
lines changed

datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,14 @@ impl NullState {
139139
// no nulls, no filter,
140140
(false, None) => {
141141
let iter = group_indices.iter().zip(data.iter());
142+
142143
for (&group_index, &new_value) in iter {
143-
seen_values.set_bit(group_index, true);
144144
value_fn(group_index, new_value);
145145
}
146+
// update seen values in separate loop
147+
for &group_index in group_indices.iter() {
148+
seen_values.set_bit(group_index, true);
149+
}
146150
}
147151
// nulls, no filter
148152
(true, None) => {
@@ -157,6 +161,7 @@ impl NullState {
157161
let data_remainder = data_chunks.remainder();
158162

159163
group_indices_chunks
164+
.clone()
160165
.zip(data_chunks)
161166
.zip(bit_chunks.iter())
162167
.for_each(|((group_index_chunk, data_chunk), mask)| {
@@ -167,14 +172,28 @@ impl NullState {
167172
// valid bit was set, real value
168173
let is_valid = (mask & index_mask) != 0;
169174
if is_valid {
170-
seen_values.set_bit(group_index, true);
171175
value_fn(group_index, new_value);
172176
}
173177
index_mask <<= 1;
174178
},
175179
)
176180
});
177181

182+
group_indices_chunks.zip(bit_chunks.iter()).for_each(
183+
|(group_index_chunk, mask)| {
184+
// index_mask has value 1 << i in the loop
185+
let mut index_mask = 1;
186+
group_index_chunk.iter().for_each(|&group_index| {
187+
// valid bit was set, real value
188+
let is_valid = (mask & index_mask) != 0;
189+
if is_valid {
190+
seen_values.set_bit(group_index, true);
191+
}
192+
index_mask <<= 1;
193+
})
194+
},
195+
);
196+
178197
// handle any remaining bits (after the initial 64)
179198
let remainder_bits = bit_chunks.remainder_bits();
180199
group_indices_remainder
@@ -184,10 +203,17 @@ impl NullState {
184203
.for_each(|(i, (&group_index, &new_value))| {
185204
let is_valid = remainder_bits & (1 << i) != 0;
186205
if is_valid {
187-
seen_values.set_bit(group_index, true);
188206
value_fn(group_index, new_value);
189207
}
190208
});
209+
group_indices_remainder.iter().enumerate().for_each(
210+
|(i, &group_index)| {
211+
let is_valid = remainder_bits & (1 << i) != 0;
212+
if is_valid {
213+
seen_values.set_bit(group_index, true);
214+
}
215+
},
216+
);
191217
}
192218
// no nulls, but a filter
193219
(false, Some(filter)) => {
@@ -201,10 +227,17 @@ impl NullState {
201227
.zip(filter.iter())
202228
.for_each(|((&group_index, &new_value), filter_value)| {
203229
if let Some(true) = filter_value {
204-
seen_values.set_bit(group_index, true);
205230
value_fn(group_index, new_value);
206231
}
207-
})
232+
});
233+
234+
group_indices.iter().zip(filter.iter()).for_each(
235+
|(&group_index, filter_value)| {
236+
if let Some(true) = filter_value {
237+
seen_values.set_bit(group_index, true);
238+
}
239+
},
240+
)
208241
}
209242
// both null values and filters
210243
(true, Some(filter)) => {

0 commit comments

Comments
 (0)