Skip to content

Commit faa8094

Browse files
committed
refactor: rebase onto main
1 parent e8f8e3f commit faa8094

File tree

1 file changed

+20
-29
lines changed

1 file changed

+20
-29
lines changed

datafusion/src/physical_plan/expressions/approx_quantile.rs

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{any::Any, sync::Arc};
18+
use std::{any::Any, iter, sync::Arc};
1919

2020
use arrow::{
2121
array::{
@@ -151,9 +151,7 @@ impl AggregateExpr for ApproxQuantile {
151151

152152
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
153153
let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
154-
t
155-
@
156-
(DataType::UInt8
154+
t @ (DataType::UInt8
157155
| DataType::UInt16
158156
| DataType::UInt32
159157
| DataType::UInt64
@@ -202,17 +200,6 @@ impl Accumulator for ApproxQuantileAccumulator {
202200
Ok(self.digest.to_scalar_state())
203201
}
204202

205-
fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
206-
debug_assert_eq!(
207-
values.len(),
208-
1,
209-
"invalid number of values in quantile update"
210-
);
211-
212-
self.digest = self.digest.merge_unsorted([values[0].clone()])?;
213-
Ok(())
214-
}
215-
216203
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
217204
debug_assert_eq!(
218205
values.len(),
@@ -273,19 +260,6 @@ impl Accumulator for ApproxQuantileAccumulator {
273260
Ok(())
274261
}
275262

276-
fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
277-
debug_assert_eq!(
278-
states.len(),
279-
6,
280-
"invalid number of state fields for quantile accumulator"
281-
);
282-
283-
let other = TDigest::from_scalar_state(states);
284-
self.digest = TDigest::merge_digests(&[self.digest.clone(), other]);
285-
286-
Ok(())
287-
}
288-
289263
fn evaluate(&self) -> Result<ScalarValue> {
290264
let q = self.digest.estimate_quantile(self.quantile);
291265

@@ -307,6 +281,23 @@ impl Accumulator for ApproxQuantileAccumulator {
307281
}
308282

309283
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
310-
todo!()
284+
if states.is_empty() {
285+
return Ok(());
286+
};
287+
288+
let states = (0..states[0].len())
289+
.map(|index| {
290+
states
291+
.iter()
292+
.map(|array| ScalarValue::try_from_array(array, index))
293+
.collect::<Result<Vec<_>>>()
294+
.map(|state| TDigest::from_scalar_state(&state))
295+
})
296+
.chain(iter::once(Ok(self.digest.clone())))
297+
.collect::<Result<Vec<_>>>()?;
298+
299+
self.digest = TDigest::merge_digests(&states);
300+
301+
Ok(())
311302
}
312303
}

0 commit comments

Comments
 (0)