Skip to content

Commit

Permalink
Always pass DataType to PrimitiveDistinctCountAccumulator (apache#10047)
Browse files Browse the repository at this point in the history
  • Loading branch information
joroKr21 authored Apr 11, 2024
1 parent feb9100 commit 118eecd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 51 deletions.
103 changes: 59 additions & 44 deletions datafusion/physical-expr/src/aggregate/count_distinct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,55 +99,70 @@ impl AggregateExpr for DistinctCount {
use DataType::*;
use TimeUnit::*;

Ok(match &self.state_data_type {
let data_type = &self.state_data_type;
Ok(match data_type {
// try and use a specialized accumulator if possible, otherwise fall back to generic accumulator
Int8 => Box::new(PrimitiveDistinctCountAccumulator::<Int8Type>::new()),
Int16 => Box::new(PrimitiveDistinctCountAccumulator::<Int16Type>::new()),
Int32 => Box::new(PrimitiveDistinctCountAccumulator::<Int32Type>::new()),
Int64 => Box::new(PrimitiveDistinctCountAccumulator::<Int64Type>::new()),
UInt8 => Box::new(PrimitiveDistinctCountAccumulator::<UInt8Type>::new()),
UInt16 => Box::new(PrimitiveDistinctCountAccumulator::<UInt16Type>::new()),
UInt32 => Box::new(PrimitiveDistinctCountAccumulator::<UInt32Type>::new()),
UInt64 => Box::new(PrimitiveDistinctCountAccumulator::<UInt64Type>::new()),
dt @ Decimal128(_, _) => Box::new(
PrimitiveDistinctCountAccumulator::<Decimal128Type>::new()
.with_data_type(dt.clone()),
),
dt @ Decimal256(_, _) => Box::new(
PrimitiveDistinctCountAccumulator::<Decimal256Type>::new()
.with_data_type(dt.clone()),
),

Date32 => Box::new(PrimitiveDistinctCountAccumulator::<Date32Type>::new()),
Date64 => Box::new(PrimitiveDistinctCountAccumulator::<Date64Type>::new()),
Int8 => Box::new(PrimitiveDistinctCountAccumulator::<Int8Type>::new(
data_type,
)),
Int16 => Box::new(PrimitiveDistinctCountAccumulator::<Int16Type>::new(
data_type,
)),
Int32 => Box::new(PrimitiveDistinctCountAccumulator::<Int32Type>::new(
data_type,
)),
Int64 => Box::new(PrimitiveDistinctCountAccumulator::<Int64Type>::new(
data_type,
)),
UInt8 => Box::new(PrimitiveDistinctCountAccumulator::<UInt8Type>::new(
data_type,
)),
UInt16 => Box::new(PrimitiveDistinctCountAccumulator::<UInt16Type>::new(
data_type,
)),
UInt32 => Box::new(PrimitiveDistinctCountAccumulator::<UInt32Type>::new(
data_type,
)),
UInt64 => Box::new(PrimitiveDistinctCountAccumulator::<UInt64Type>::new(
data_type,
)),
Decimal128(_, _) => Box::new(PrimitiveDistinctCountAccumulator::<
Decimal128Type,
>::new(data_type)),
Decimal256(_, _) => Box::new(PrimitiveDistinctCountAccumulator::<
Decimal256Type,
>::new(data_type)),

Date32 => Box::new(PrimitiveDistinctCountAccumulator::<Date32Type>::new(
data_type,
)),
Date64 => Box::new(PrimitiveDistinctCountAccumulator::<Date64Type>::new(
data_type,
)),
Time32(Millisecond) => Box::new(PrimitiveDistinctCountAccumulator::<
Time32MillisecondType,
>::new()),
Time32(Second) => {
Box::new(PrimitiveDistinctCountAccumulator::<Time32SecondType>::new())
}
>::new(data_type)),
Time32(Second) => Box::new(PrimitiveDistinctCountAccumulator::<
Time32SecondType,
>::new(data_type)),
Time64(Microsecond) => Box::new(PrimitiveDistinctCountAccumulator::<
Time64MicrosecondType,
>::new()),
Time64(Nanosecond) => {
Box::new(PrimitiveDistinctCountAccumulator::<Time64NanosecondType>::new())
}
dt @ Timestamp(Microsecond, _) => Box::new(
PrimitiveDistinctCountAccumulator::<TimestampMicrosecondType>::new()
.with_data_type(dt.clone()),
),
dt @ Timestamp(Millisecond, _) => Box::new(
PrimitiveDistinctCountAccumulator::<TimestampMillisecondType>::new()
.with_data_type(dt.clone()),
),
dt @ Timestamp(Nanosecond, _) => Box::new(
PrimitiveDistinctCountAccumulator::<TimestampNanosecondType>::new()
.with_data_type(dt.clone()),
),
dt @ Timestamp(Second, _) => Box::new(
PrimitiveDistinctCountAccumulator::<TimestampSecondType>::new()
.with_data_type(dt.clone()),
),
>::new(data_type)),
Time64(Nanosecond) => Box::new(PrimitiveDistinctCountAccumulator::<
Time64NanosecondType,
>::new(data_type)),
Timestamp(Microsecond, _) => Box::new(PrimitiveDistinctCountAccumulator::<
TimestampMicrosecondType,
>::new(data_type)),
Timestamp(Millisecond, _) => Box::new(PrimitiveDistinctCountAccumulator::<
TimestampMillisecondType,
>::new(data_type)),
Timestamp(Nanosecond, _) => Box::new(PrimitiveDistinctCountAccumulator::<
TimestampNanosecondType,
>::new(data_type)),
Timestamp(Second, _) => Box::new(PrimitiveDistinctCountAccumulator::<
TimestampSecondType,
>::new(data_type)),

Float16 => Box::new(FloatDistinctCountAccumulator::<Float16Type>::new()),
Float32 => Box::new(FloatDistinctCountAccumulator::<Float32Type>::new()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,12 @@ where
T: ArrowPrimitiveType + Send,
T::Native: Eq + Hash,
{
pub(super) fn new() -> Self {
pub(super) fn new(data_type: &DataType) -> Self {
Self {
values: HashSet::default(),
data_type: T::DATA_TYPE,
data_type: data_type.clone(),
}
}

pub(super) fn with_data_type(mut self, data_type: DataType) -> Self {
self.data_type = data_type;
self
}
}

impl<T> Accumulator for PrimitiveDistinctCountAccumulator<T>
Expand Down

0 comments on commit 118eecd

Please sign in to comment.