Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: native types in DistinctCountAccumulator for primitive types #8721

Merged
merged 10 commits into from
Jan 5, 2024

Conversation

korowa
Copy link
Contributor

@korowa korowa commented Jan 2, 2024

Which issue does this PR close?

Part of #5472.

Rationale for this change

The main idea is to avoid ScalarValue wrappers for all update/merge operations to improve performance of DistinctCountAccumulator

What changes are included in this PR?

  • Hashable newtype for arrow primitives moved to utils module -- required to use float types as hashmap/hashset keys
  • NativeDistinctCountAccumulator -- implementation of accumulator for primitive types with implemented Eq + Hash traints (integer-based types)
  • FloatDistinctCountAccumulator -- implementation of accumulator for float-based types through Hashable wrapper

Are these changes tested?

Existing test coverage + additional test for bigint (i256 -- base for Decimal256Type).

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-expr Physical Expressions label Jan 2, 2024
@korowa
Copy link
Contributor Author

korowa commented Jan 2, 2024

Benchmark results are

Comparing master and count_distinct_groups_accumulator
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     master ┃ count_distinct_groups_accumulator ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     1.96ms │                            2.20ms │  1.12x slower │
│ QQuery 1     │    92.00ms │                           81.50ms │ +1.13x faster │
│ QQuery 2     │   219.23ms │                          223.98ms │     no change │
│ QQuery 3     │   231.46ms │                          231.61ms │     no change │
│ QQuery 4     │  2086.56ms │                         2080.57ms │     no change │
│ QQuery 5     │  3009.78ms │                         2994.23ms │     no change │
│ QQuery 6     │    82.67ms │                           77.54ms │ +1.07x faster │
│ QQuery 7     │    90.44ms │                           89.39ms │     no change │
│ QQuery 8     │  3426.67ms │                         3469.59ms │     no change │
│ QQuery 9     │  4986.15ms │                         1895.55ms │ +2.63x faster │
│ QQuery 10    │   721.45ms │                          718.35ms │     no change │
│ QQuery 11    │   814.72ms │                          797.00ms │     no change │
│ QQuery 12    │  2756.22ms │                         2788.63ms │     no change │
│ QQuery 13    │  5374.68ms │                         5300.41ms │     no change │
│ QQuery 14    │  2990.96ms │                         3018.51ms │     no change │
│ QQuery 15    │  2361.06ms │                         2360.47ms │     no change │
│ QQuery 16    │  6424.18ms │                         6294.30ms │     no change │
│ QQuery 17    │  6082.96ms │                         5977.81ms │     no change │
│ QQuery 18(x) │     0.16ms │                            0.18ms │  1.10x slower │
│ QQuery 19    │   193.16ms │                          185.69ms │     no change │
│ QQuery 20    │  3371.44ms │                         3343.49ms │     no change │
│ QQuery 21    │  4264.87ms │                         4326.25ms │     no change │
│ QQuery 22    │ 11111.22ms │                        10905.60ms │     no change │
│ QQuery 23    │ 27881.39ms │                        27258.91ms │     no change │
│ QQuery 24    │  1444.51ms │                         1434.72ms │     no change │
│ QQuery 25    │  1200.78ms │                         1174.84ms │     no change │
│ QQuery 26    │  1576.50ms │                         1555.85ms │     no change │
│ QQuery 27    │  4708.36ms │                         4633.59ms │     no change │
│ QQuery 28    │ 30362.04ms │                        31230.25ms │     no change │
│ QQuery 29    │  1224.66ms │                         1202.81ms │     no change │
│ QQuery 30    │  2692.08ms │                         2668.50ms │     no change │
│ QQuery 31    │  3509.70ms │                         3498.93ms │     no change │
│ QQuery 32(x) │     0.18ms │                            0.24ms │  1.32x slower │
│ QQuery 33(x) │     0.17ms │                            0.24ms │  1.37x slower │
│ QQuery 34(x) │     0.16ms │                            0.14ms │ +1.12x faster │
│ QQuery 35    │  3781.02ms │                         3800.45ms │     no change │
│ QQuery 36    │   322.11ms │                          332.11ms │     no change │
│ QQuery 37    │   183.90ms │                          171.58ms │ +1.07x faster │
│ QQuery 38    │   162.01ms │                          161.94ms │     no change │
│ QQuery 39    │   771.03ms │                          760.86ms │     no change │
│ QQuery 40    │    81.05ms │                           77.93ms │     no change │
│ QQuery 41    │    74.25ms │                           62.69ms │ +1.18x faster │
│ QQuery 42    │    75.99ms │                           70.00ms │ +1.09x faster │
└──────────────┴────────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (master)                              │ 140745.91ms │
│ Total Time (count_distinct_groups_accumulator)   │ 137259.41ms │
│ Average Time (master)                            │   3273.16ms │
│ Average Time (count_distinct_groups_accumulator) │   3192.08ms │
│ Queries Faster                                   │           7 │
│ Queries Slower                                   │           4 │
│ Queries with No Change                           │          32 │
└──────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃   master ┃ count_distinct_groups_accumulator ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 299.27ms │                          294.08ms │     no change │
│ QQuery 2     │  71.33ms │                           71.51ms │     no change │
│ QQuery 3     │ 146.08ms │                          153.59ms │  1.05x slower │
│ QQuery 4     │  95.64ms │                           99.49ms │     no change │
│ QQuery 5     │ 190.03ms │                          177.54ms │ +1.07x faster │
│ QQuery 6     │  90.02ms │                           79.86ms │ +1.13x faster │
│ QQuery 7     │ 309.50ms │                          310.71ms │     no change │
│ QQuery 8     │ 222.93ms │                          219.75ms │     no change │
│ QQuery 9     │ 288.22ms │                          307.12ms │  1.07x slower │
│ QQuery 10    │ 254.91ms │                          257.15ms │     no change │
│ QQuery 11    │  72.89ms │                           69.68ms │     no change │
│ QQuery 12    │ 153.95ms │                          148.15ms │     no change │
│ QQuery 13    │ 307.94ms │                          284.76ms │ +1.08x faster │
│ QQuery 14    │ 119.62ms │                          132.48ms │  1.11x slower │
│ QQuery 15    │ 145.98ms │                          131.71ms │ +1.11x faster │
│ QQuery 16    │  80.01ms │                           84.45ms │  1.06x slower │
│ QQuery 17    │ 295.38ms │                          254.44ms │ +1.16x faster │
│ QQuery 18    │ 551.83ms │                          535.77ms │     no change │
│ QQuery 19    │ 206.88ms │                          223.29ms │  1.08x slower │
│ QQuery 20    │ 188.34ms │                          192.10ms │     no change │
│ QQuery 21    │ 391.18ms │                          396.50ms │     no change │
│ QQuery 22    │  85.64ms │                           83.84ms │     no change │
└──────────────┴──────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (master)                              │ 4567.55ms │
│ Total Time (count_distinct_groups_accumulator)   │ 4507.98ms │
│ Average Time (master)                            │  207.62ms │
│ Average Time (count_distinct_groups_accumulator) │  204.91ms │
│ Queries Faster                                   │         5 │
│ Queries Slower                                   │         5 │
│ Queries with No Change                           │        12 │
└──────────────────────────────────────────────────┴───────────┘

The target query was clickbench q9 -- there are not so many queries where DF plans groups accumulator for count distinct.

(x) marked queries were constantly killed by OOM on my laptop, so they were replaced with dummy select 1; (there are no count distinct queries among them).

@korowa
Copy link
Contributor Author

korowa commented Jan 2, 2024

CC @alamb @Dandandan

It would be great, to get review / opinion on this PR from you, as from original issue participants, when (if) you have some time.

@alamb
Copy link
Contributor

alamb commented Jan 2, 2024

Thank you @korowa -- I will review this PR tomorrow

T: ArrowPrimitiveType + Send,
{
/// Vector for storing unique values sets for each group index
unique_values: Vec<HashSet<Hashable<T::Native>>>,
Copy link
Contributor

@Dandandan Dandandan Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hoped we could store this in something like HashSet<(usize, T::Native)>, i.e. we could ammortize the allocations for all the groups instead of creating a HashSet for each group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my initial intention -- but collecting intermediate states is quite painful when using pure HashSet<(group, value)>, and using additional structures for storing chains of values for each group seemed like a memory overhead in this case.

However, I agree that managing single hash table per accumulator makes sense -- maybe the proper way of speeding up accumulator will be switching to native types in regular (current) accumulator first, and then start digging into proper groups accumulator implementation which will provide noticeable boost 🤔.

Copy link
Contributor Author

@korowa korowa Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the proper way of speeding up accumulator will be switching to native types in regular (current) accumulator first

I'll try this option then, I guess, at first -- anyway it was going to be follow up for current version.


impl<T: ToByteSlice> std::hash::Hash for Hashable<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.to_byte_slice().hash(state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is only needed for float types? It would probably be faster to use the native hash implementation for other (integer) types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it only required for floats (and it only makes sense to use it for floats, cause default implementations of these traits for integers outperforms this one).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to have an implementation for primitive and float types instead one using ToByteSlice then?

@alamb
Copy link
Contributor

alamb commented Jan 3, 2024

I am sorry -- I ran out of time today to give this a thorough review but I plan to do it first thing tomorrow

@korowa korowa changed the title feat: naive DistinctCountGroupsAccumulator for primitive types feat: native types in DistinctCountAccumulator for primitive types Jan 4, 2024
@korowa
Copy link
Contributor Author

korowa commented Jan 4, 2024

UPD: after the comment by @Dandandan, regarding implementation of groups accumulator, I've realized that it's unclear (as for me) now how to implement it decently, and in the same time naive implementation will still require non-zero efforts to support it.

In the same time, performance improvement mostly comes from replacement of ScalarValue with native types, so I've reworked regular accumulators in this PR -- this way it should be more meaningful and less stressful change.

The results for clickbench for updated branch:


--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     master ┃ count_distinct_groups_accumulator ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     1.99ms │                            2.32ms │  1.16x slower │
│ QQuery 1     │    85.49ms │                           89.27ms │     no change │
│ QQuery 2     │   212.14ms │                          209.42ms │     no change │
│ QQuery 3     │   238.46ms │                          236.34ms │     no change │
│ QQuery 4     │  2073.40ms │                         2073.13ms │     no change │
│ QQuery 5     │  3046.88ms │                         3027.83ms │     no change │
│ QQuery 6     │    81.46ms │                           83.87ms │     no change │
│ QQuery 7     │    93.53ms │                           88.94ms │     no change │
│ QQuery 8     │  3325.04ms │                         3377.85ms │     no change │
│ QQuery 9     │  5072.76ms │                         2203.71ms │ +2.30x faster │
│ QQuery 10    │   755.97ms │                          722.47ms │     no change │
│ QQuery 11    │   833.25ms │                          792.76ms │     no change │
│ QQuery 12    │  2778.89ms │                         2750.02ms │     no change │
│ QQuery 13    │  5375.29ms │                         5333.02ms │     no change │
│ QQuery 14    │  3042.31ms │                         3083.98ms │     no change │
│ QQuery 15    │  2317.14ms │                         2419.19ms │     no change │
│ QQuery 16    │  6438.85ms │                         6467.34ms │     no change │
│ QQuery 17    │  5956.24ms │                         6179.98ms │     no change │
│ QQuery 18(x) │     0.23ms │                            0.18ms │ +1.30x faster │
│ QQuery 19    │   201.02ms │                          197.43ms │     no change │
│ QQuery 20    │  3383.48ms │                         3499.00ms │     no change │
│ QQuery 21    │  4334.13ms │                         4305.25ms │     no change │
│ QQuery 22    │ 10960.44ms │                        11275.28ms │     no change │
│ QQuery 23    │ 27659.63ms │                        28158.01ms │     no change │
│ QQuery 24    │  1445.17ms │                         1470.58ms │     no change │
│ QQuery 25    │  1218.96ms │                         1234.64ms │     no change │
│ QQuery 26    │  1586.36ms │                         1584.74ms │     no change │
│ QQuery 27    │  4713.18ms │                         4795.20ms │     no change │
│ QQuery 28    │ 31076.60ms │                        30640.46ms │     no change │
│ QQuery 29    │  1253.69ms │                         1190.34ms │ +1.05x faster │
│ QQuery 30    │  2687.34ms │                         2667.62ms │     no change │
│ QQuery 31    │  3511.00ms │                         3653.13ms │     no change │
│ QQuery 32(x) │     0.19ms │                            0.18ms │     no change │
│ QQuery 33(x) │     0.17ms │                            0.16ms │     no change │
│ QQuery 34(x) │     0.16ms │                            0.15ms │ +1.09x faster │
│ QQuery 35    │  3828.87ms │                         3870.08ms │     no change │
│ QQuery 36    │   332.74ms │                          332.81ms │     no change │
│ QQuery 37    │   191.57ms │                          175.67ms │ +1.09x faster │
│ QQuery 38    │   167.19ms │                          159.98ms │     no change │
│ QQuery 39    │   778.11ms │                          781.46ms │     no change │
│ QQuery 40    │    81.00ms │                           81.00ms │     no change │
│ QQuery 41    │    71.14ms │                           69.90ms │     no change │
│ QQuery 42    │    78.41ms │                           70.35ms │ +1.11x faster │
└──────────────┴────────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (master)                              │ 141289.87ms │
│ Total Time (count_distinct_groups_accumulator)   │ 139355.04ms │
│ Average Time (master)                            │   3285.81ms │
│ Average Time (count_distinct_groups_accumulator) │   3240.81ms │
│ Queries Faster                                   │           6 │
│ Queries Slower                                   │           1 │
│ Queries with No Change                           │          36 │
└──────────────────────────────────────────────────┴─────────────┘

The target query was clickbench q9 -- there are not so many queries where DF plans groups accumulator for count distinct.

(x) marked queries were constantly killed by OOM on my laptop, so they were replaced with dummy select 1; (there are no count distinct queries among them).

Also works here


Float16 => float_distinct_count_accumulator!(Float16Type),
Float32 => float_distinct_count_accumulator!(Float32Type),
Float64 => float_distinct_count_accumulator!(Float64Type),
Copy link
Contributor

@Dandandan Dandandan Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for now, but if we would like to do it for strings / bytes, we could do use a datastructure like this to get maximal performance:

/// Contains hashes and offsets for given hash (+ potential collisions), use `RawTable` for extra speed
uniques: HashMap<u64, SmallVec<u64; 1>>,
/// actual string/byte data, can be emitted cheaply / free
values: BufferBuilder<u8>,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the idea in #7064

Maybe we can eventually use the same data structure (specialized for storing string values not using a String)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @korowa -- I think this PR is a significant improvement over the initial implementation.

I think the only thing that is critical to fix prior to merging is the size calculation for the HashSet

As @Dandandan points out, there are clearly some additional improvements that could be added that we should consider before merging but I also don't think they are critical.

Thanks again -- this is a really great step forward


Float16 => float_distinct_count_accumulator!(Float16Type),
Float32 => float_distinct_count_accumulator!(Float32Type),
Float64 => float_distinct_count_accumulator!(Float64Type),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the idea in #7064

Maybe we can eventually use the same data structure (specialized for storing string values not using a String)

let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
self.values.iter().cloned(),
)) as ArrayRef;
let list = Arc::new(array_into_list_array(arr)) as ArrayRef;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 for @jayzhan211 for switching the native implementation of ScalarValue::List to use an ArrayRef

datafusion/physical-expr/src/aggregate/count_distinct.rs Outdated Show resolved Hide resolved
@@ -336,9 +336,18 @@ where
}

fn size(&self) -> usize {
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can (as a follow on PR) put this logic into its own function (with comments) as estimating the size of hashbrown hashtables is likely to come up again

@Dandandan Dandandan merged commit 561d941 into apache:main Jan 5, 2024
22 checks passed
@Dandandan
Copy link
Contributor

Thank you @korowa this is awesome 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants