Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for conflicting order sensitive aggregates in ARRAY_AGG aggregate function #8558

Closed
wants to merge 56 commits into from
Closed

Add support for conflicting order sensitive aggregates in ARRAY_AGG aggregate function #8558

wants to merge 56 commits into from

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Dec 15, 2023

Which issue does this PR close?

Closes #.
Related to #8582

Rationale for this change

Currently, order sensitive aggregation cannot work when there is incompatible ordering requirements such as following

SELECT ARRAY_AGG(a, ORDER BY  b DESC), ARRAY_AGG(a, ORDER BY c DESC)
FROM table1

This PR adds this functionality.

What changes are included in this PR?

With this PR we can support conflicting (e.g incompatible) ordering requirements among aggregate expressions. To support this functionality existing design is changed a bit.

With this design it is guaranteed that each order sensitive aggregator will receive ordered batches (according to its requirement) at its update_batch method.
However, it is not guaranteed that batches are ordered globally.
As an example an order sensitive aggregator with requirement a DESC may receive the following batch

a
4
3
2
1

in update_batch argument. Then may receive the batch

a
5
3
2
0

in the next run. Hence we update state at each update_batch call with new values without any assumption of global ordering (This is the difference between previous design). At the previous design it was guaranteed that
received batches are ordered across different update_batch runs.
For instance following batches received at the update_batch

a
5
4
3
3
a
2
2
1
1

would be valid for the previous design. However, following batches wouldn't be

a
5
4
3
1
a
3
2
1
1

With new design both of these batches are valid. This relaxation enables us to support incompatible orderings. By locally sorting RecordBatches at the input of the aggregation for each distinct aggregate group that requires different ordering.

Are these changes tested?

Yes new tests are added.

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Dec 15, 2023
…ggregates

# Conflicts:
#	datafusion/sqllogictest/test_files/groupby.slt
@alamb
Copy link
Contributor

alamb commented Dec 17, 2023

Thank you for this PR @mustafasrepo -- I have this one on my list to look at carefully over the next day or two

@ozankabak
Copy link
Contributor

ozankabak commented Dec 18, 2023

I am changing this to draft to avoid any misunderstanding that this ready for merge. The code works, but we want to iterate on the design with the community to (1) avoid any performance regressions on ordinary single-aggregation cases, and (2) converge to a general/extensible design as early as possible.

@alamb alamb changed the title Add support for conflicting order sensitive aggregates Add support for conflicting order sensitive aggregates in ARRAY_AGG aggregate function Dec 18, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started looking at this PR but I haven't gone through the entire code in detail

I am definitely potentially worried about the size of the change to the hash aggregate stream, given how performance sensitive that code is. At the very least we should benchmark the code and make sure this PR doesn't slow it down

Goals

It would help to take a step back and review what we are trying to accomplish in the context of what is currently implemented. Here is my understanding (please correct me if I got it wrong)

As of today, on main

  1. create_aggregate_expr supports exactly three aggregate functions that are order sensitive: ARRAY_AGG, FIRST_VALUE and LAST_VALUE. Order sensitive means they can have an ORDER BY specified in their arguments

  2. order sensitive aggregates work today is to ensure the input to the group by is sorted by group and then by aggregate ORDER BY argument. So ARRAY_AGG(x ORDER BY x) GROUP BY y) would result in sorting the input to the group by to be x, y)

  3. The goal is to allow multiple instances of these functions to appear in a single query, which means the single resort doesn't work if there are conflicting requirements.

Potential solutions

  1. Teach the HashAggregate stream about the order required for the expressions and then feed data in that order to the aggregates.

  2. Update the aggregates themselves to handle the knowledge of ordeirng internally

In this case, rather than resorting the input to the hash aggregate, we could update the aggregator implementations to consider the input sort order directly, rather than relying on another operator to re-sort the data. We could use pre-sorted data as an optimization, but fall back to tracking the input internally.

So for a query like

SELECT FIRST_VALUE(a ORDER BY z) FROM t GROUP BY y 

The FirstValueAccumulator would internally could know to find the last value of z and then get the corresponding value of a. Note this doesn't actually require a sort, it just requires finding the max.

This PR

This PR seems to take a hybrid approach where the hash aggregate sorts each input batch prior to passing to each order preserving accumulator, and each accumulator must then handle multiple batches that are sorted.

I think the benefit of this design is that if there are many aggregates in a query with the same sort order

FIRST_VALUE(a, ORDER BY z), 
FIRST_VALUE(b, ORDER BY z), 
FIRST_VALUE(c, ORDER BY z), 
...

The data only needs to be sorted by z once (per batch)

However, one potential downside is that the implementation is complicated and another downside is that sorting is not necessary for FIRST_VALUE and LAST_VALUE -- only the position of the max value of z is really needed

@@ -2297,8 +2296,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
----MemoryExec: partitions=1, partition_sizes=[1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this plan change. Now there is no sort but the comments above say there should be

# test_ordering_sensitive_aggregation
# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
# in the aggregation

@@ -994,6 +831,132 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
Arc::new(Schema::new(group_fields))
}

/// Determines the lexical ordering requirement for an aggregate expression.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this code just moved, or was it also changed?

let batch = if aggregate_group.requirement.is_empty() {
batch.clone()
} else {
sort_batch(&batch, &aggregate_group.requirement, None)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this sort the entire input to the group, or just the rows in the current batch? I think it just sorts the current batch, but it would actually need to sort the entire input for that group, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sorts the current batch. By changing the implementation of the order-sensitive aggregators, we no longer depend on the property: entire input for the group is ordered.

@alamb
Copy link
Contributor

alamb commented Dec 19, 2023

@mustafasrepo @ozankabak and I had a brief meeting this moring. Here are some notes I took

Goal:

The longer term goal is to support many more order aware normal (as opposed to window) aggregates such as various N'th value, rank functions, etc. These would be both for built in functions and user defined aggregates. In other words, this is a much larger feature than just for the current three functions.

Possible Design Discussions

We discussed several possible designs that had different tradeoffs (largely where the complexity was):

  1. Aggregators handle the ordering only: more complicated aggregators, simpler hash aggregate stream, can do per-aggregator optimizations (like nth value)

  2. The hash stream handles the ordering of the aggregator arguments: potentially reuse sorts and is more efficient, but will always sort even if the aggregator doesn't require it.

  3. Do a rewrite of the query as multiple branches that share a CTE: would keep complexity out of the hash aggregator stream, and potentially offers more optimization opportunities, but will take longer to implement.

Next steps:

  1. Run benchmarks on this PR -- if that shows no performance difference @alamb will complete a detailed review, and as long as there are no objections from the rest of the community we can merge it as a temporary measure while we work on a more detailed esign.
  2. @mustafasrepo and @alamb will work on a larger proposal for option 3

@alamb
Copy link
Contributor

alamb commented Dec 19, 2023

I also filed #8582 to support multiple order aware aggregates and #8583 to track the larger goals of order aware aggregates

@mustafasrepo
Copy link
Contributor Author

I ran the benchmark. It seems that there is no noticable difference between main and this branch

Comparing apache_main and feature_conflicting_order_sensitive_aggregates
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ apache_main ┃ feature_conflicting_order_sensitive_aggregates ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │    155.03ms │                                       155.11ms │     no change │
│ QQuery 2     │     51.29ms │                                        50.19ms │     no change │
│ QQuery 3     │     70.80ms │                                        70.74ms │     no change │
│ QQuery 4     │     53.50ms │                                        52.49ms │     no change │
│ QQuery 5     │     83.31ms │                                        81.72ms │     no change │
│ QQuery 6     │     32.21ms │                                        34.44ms │  1.07x slower │
│ QQuery 7     │    110.54ms │                                       108.78ms │     no change │
│ QQuery 8     │    107.96ms │                                       105.57ms │     no change │
│ QQuery 9     │    131.45ms │                                       129.97ms │     no change │
│ QQuery 10    │    138.69ms │                                       136.58ms │     no change │
│ QQuery 11    │     53.11ms │                                        37.97ms │ +1.40x faster │
│ QQuery 12    │     88.59ms │                                        87.70ms │     no change │
│ QQuery 13    │    193.31ms │                                       193.14ms │     no change │
│ QQuery 14    │     52.28ms │                                        50.66ms │     no change │
│ QQuery 15    │     63.30ms │                                        63.46ms │     no change │
│ QQuery 16    │     56.69ms │                                        55.70ms │     no change │
│ QQuery 17    │    114.69ms │                                       114.89ms │     no change │
│ QQuery 18    │    198.72ms │                                       201.31ms │     no change │
│ QQuery 19    │     99.40ms │                                        98.48ms │     no change │
│ QQuery 20    │     58.72ms │                                        61.62ms │     no change │
│ QQuery 21    │    133.12ms │                                       137.09ms │     no change │
│ QQuery 22    │     39.80ms │                                        40.91ms │     no change │
└──────────────┴─────────────┴────────────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                             ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (apache_main)                                      │ 2086.50ms │
│ Total Time (feature_conflicting_order_sensitive_aggregates)   │ 2068.53ms │
│ Average Time (apache_main)                                    │   94.84ms │
│ Average Time (feature_conflicting_order_sensitive_aggregates) │   94.02ms │
│ Queries Faster                                                │         1 │
│ Queries Slower                                                │         1 │
│ Queries with No Change                                        │        20 │
└───────────────────────────────────────────────────────────────┴───────────┘

@alamb
Copy link
Contributor

alamb commented Dec 19, 2023

Thanks @mustafasrepo -- can you also run the clickbench benchmark as well (which is very heavy on the aggregation code)?

@mustafasrepo
Copy link
Contributor Author

Thanks @mustafasrepo -- can you also run the clickbench benchmark as well (which is very heavy on the aggregation code)?

Sure

@mustafasrepo
Copy link
Contributor Author

mustafasrepo commented Dec 19, 2023

By the way I wrote a design document for possible approaches. It can be found in document. It has 1 missing approach for now, I will add 3rd approach also.

@alamb
Copy link
Contributor

alamb commented Dec 19, 2023

By the way I wrote a design document for possible approaches. It can be found in document. It has 1 missing approach for now, I will add 3rd approach also.

Thank you -- I will check it out tomorrow!

@mustafasrepo
Copy link
Contributor Author

I ran a benchmark for the clickbench queries. Results can be found below

--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ apache_main ┃ feature_conflicting_order_sensitive_aggregates ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │      1.98ms │                                         1.78ms │ +1.12x faster │
│ QQuery 1     │     42.83ms │                                        40.52ms │ +1.06x faster │
│ QQuery 2     │    106.42ms │                                       106.61ms │     no change │
│ QQuery 3     │     95.70ms │                                        93.81ms │     no change │
│ QQuery 4     │    715.72ms │                                       707.31ms │     no change │
│ QQuery 5     │   1226.86ms │                                      1187.07ms │     no change │
│ QQuery 6     │     42.34ms │                                        40.89ms │     no change │
│ QQuery 7     │     41.88ms │                                        43.14ms │     no change │
│ QQuery 8     │   1339.71ms │                                      1415.74ms │  1.06x slower │
│ QQuery 9     │   2088.04ms │                                      2147.51ms │     no change │
│ QQuery 10    │    307.61ms │                                       298.18ms │     no change │
│ QQuery 11    │    336.17ms │                                       326.19ms │     no change │
│ QQuery 12    │   1048.58ms │                                      1047.89ms │     no change │
│ QQuery 13    │   1963.13ms │                                      2538.39ms │  1.29x slower │
│ QQuery 14    │   1143.45ms │                                      1168.10ms │     no change │
│ QQuery 15    │    803.05ms │                                       803.61ms │     no change │
│ QQuery 16    │   2672.08ms │                                      3191.41ms │  1.19x slower │
│ QQuery 17    │   2836.43ms │                                      3035.65ms │  1.07x slower │
│ QQuery 18    │   7873.86ms │                                      7561.24ms │     no change │
│ QQuery 19    │     65.87ms │                                        68.65ms │     no change │
│ QQuery 20    │   2181.40ms │                                      2208.87ms │     no change │
│ QQuery 21    │   2490.83ms │                                      2623.68ms │  1.05x slower │
│ QQuery 22    │   6244.41ms │                                      6256.67ms │     no change │
│ QQuery 23    │  12160.02ms │                                     12248.42ms │     no change │
│ QQuery 24    │    651.40ms │                                       650.38ms │     no change │
│ QQuery 25    │    549.93ms │                                       606.82ms │  1.10x slower │
│ QQuery 26    │    704.31ms │                                       744.19ms │  1.06x slower │
│ QQuery 27    │   1831.90ms │                                      1919.69ms │     no change │
│ QQuery 28    │  17298.30ms │                                     17411.88ms │     no change │
│ QQuery 29    │    737.44ms │                                       561.39ms │ +1.31x faster │
│ QQuery 30    │   1840.93ms │                                      1080.96ms │ +1.70x faster │
│ QQuery 31    │   2432.19ms │                                      1233.25ms │ +1.97x faster │
│ QQuery 32    │  23034.53ms │                                     14705.08ms │ +1.57x faster │
│ QQuery 33    │  20271.47ms │                                     17623.06ms │ +1.15x faster │
│ QQuery 34    │  22261.09ms │                                     18786.91ms │ +1.18x faster │
│ QQuery 35    │   2101.48ms │                                      1865.86ms │ +1.13x faster │
│ QQuery 36    │    240.72ms │                                       275.98ms │  1.15x slower │
│ QQuery 37    │    156.44ms │                                       162.19ms │     no change │
│ QQuery 38    │    139.25ms │                                       161.45ms │  1.16x slower │
│ QQuery 39    │    558.83ms │                                       519.59ms │ +1.08x faster │
│ QQuery 40    │     66.60ms │                                        77.42ms │  1.16x slower │
│ QQuery 41    │     61.02ms │                                        67.05ms │  1.10x slower │
│ QQuery 42    │     66.56ms │                                        71.96ms │  1.08x slower │
└──────────────┴─────────────┴────────────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                             ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (apache_main)                                      │ 142832.78ms │
│ Total Time (feature_conflicting_order_sensitive_aggregates)   │ 127686.45ms │
│ Average Time (apache_main)                                    │   3321.69ms │
│ Average Time (feature_conflicting_order_sensitive_aggregates) │   2969.45ms │
│ Queries Faster                                                │          10 │
│ Queries Slower                                                │          12 │
│ Queries with No Change                                        │          21 │
└───────────────────────────────────────────────────────────────┴─────────────┘

…ggregates

# Conflicts:
#	datafusion/physical-plan/src/aggregates/mod.rs
#	datafusion/sqllogictest/test_files/groupby.slt
@alamb
Copy link
Contributor

alamb commented Dec 20, 2023

I will try and review this carefully later today

@alamb
Copy link
Contributor

alamb commented Dec 21, 2023

I am hesitant to make the Hash Aggregate stream any more complex than it already is -- so therefore I am worried about this PR. I think @mustafasrepo has shown the performance does not change, which is good.

However, can you please review the contents of #8582 before proceeding with this PR. I feel like we can likely make most queries faster with less complicated code

@ozankabak
Copy link
Contributor

I am hesitant to make the Hash Aggregate stream any more complex than it already is

I agree

However, can you please review the contents of #8582 before proceeding with this PR. I feel like we can likely make most queries faster with less complicated code

We will think about this over the weekend and come back to you. My preliminary thinking is Approach 3 could be a good stepping stone to Approach 2 if we feel like going direct to 2 would take too long

@alamb
Copy link
Contributor

alamb commented Dec 22, 2023

We will think about this over the weekend and come back to you.

Thank you

My preliminary thinking is Approach 3 could be a good stepping stone to Approach 2 if we feel like going direct to 2 would take too long

I am also happy to help implement Approach 3 (as we already have some version of the code to do first_value / last_value downstream in IOx)

I agree. In terms of getting to Approach 2, I think supporting shared CTEs (aka not actually re-computing a CTE each time it is referenced in the query) is required for approach 2 and would be a valuable feature in its own right for many DataFusion users so perhaps we could plan out that project

@ozankabak
Copy link
Contributor

ozankabak commented Feb 16, 2024

To avoid any confusion, we will resume this work in a different branch/repo (we are just moving code around between repos).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants