Skip to content

Conversation

@duongcongtoai
Copy link
Contributor

@duongcongtoai duongcongtoai commented Oct 4, 2025

Which issue does this PR close?

This shouldn't affect existing queries (In the first round of benchmark the result looks mixed for some reason, maybe because i leave so many running-process open)

toai@salamancabrothehood:~/proj/rust/arrow-datafusion/benchmarks$ ./bench.sh compare_detail main dev
Comparing main and dev
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃                               main ┃                                dev ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │ 502.89 / 519.75 ±12.88 / 535.48 ms │ 505.35 / 526.42 ±14.28 / 543.89 ms │ no change │
│ QQuery 2     │  136.69 / 141.06 ±2.97 / 144.91 ms │  131.02 / 139.60 ±4.86 / 145.42 ms │ no change │
│ QQuery 3     │  204.71 / 212.20 ±5.86 / 221.02 ms │ 199.94 / 216.79 ±12.45 / 238.66 ms │ no change │
│ QQuery 4     │  159.23 / 167.68 ±7.41 / 178.43 ms │  161.48 / 169.97 ±8.32 / 183.83 ms │ no change │
│ QQuery 5     │  354.72 / 359.50 ±4.46 / 366.03 ms │  353.72 / 359.41 ±3.07 / 362.31 ms │ no change │
│ QQuery 6     │  117.51 / 126.54 ±6.30 / 135.50 ms │  119.55 / 122.74 ±2.96 / 126.67 ms │ no change │
│ QQuery 7     │  519.34 / 528.57 ±7.90 / 541.57 ms │  532.59 / 536.14 ±3.50 / 542.27 ms │ no change │
│ QQuery 8     │ 257.46 / 270.43 ±10.50 / 284.80 ms │  260.59 / 268.78 ±7.63 / 279.50 ms │ no change │
│ QQuery 9     │  432.73 / 441.39 ±6.47 / 451.80 ms │  425.57 / 435.09 ±5.94 / 443.55 ms │ no change │
│ QQuery 10    │  308.93 / 318.91 ±7.23 / 329.30 ms │  312.78 / 320.95 ±6.79 / 332.50 ms │ no change │
│ QQuery 11    │  117.89 / 119.68 ±2.35 / 124.26 ms │  116.92 / 119.37 ±2.00 / 122.42 ms │ no change │
│ QQuery 12    │  191.28 / 197.23 ±3.13 / 200.16 ms │  194.00 / 198.38 ±5.85 / 209.90 ms │ no change │
│ QQuery 13    │  214.51 / 222.32 ±8.71 / 234.43 ms │  213.94 / 224.77 ±7.02 / 234.35 ms │ no change │
│ QQuery 14    │  152.46 / 157.63 ±4.48 / 164.79 ms │  149.01 / 155.77 ±5.08 / 164.23 ms │ no change │
│ QQuery 15    │  216.70 / 221.59 ±3.85 / 227.87 ms │  217.98 / 221.76 ±3.25 / 226.12 ms │ no change │
│ QQuery 16    │     81.61 / 84.70 ±3.13 / 90.66 ms │     81.46 / 83.47 ±1.93 / 86.96 ms │ no change │
│ QQuery 17    │ 505.93 / 520.44 ±11.91 / 533.78 ms │ 495.11 / 521.94 ±16.91 / 542.64 ms │ no change │
│ QQuery 18    │  721.47 / 734.01 ±9.72 / 744.97 ms │ 718.46 / 729.57 ±16.02 / 761.23 ms │ no change │
│ QQuery 19    │ 267.75 / 281.74 ±10.16 / 296.97 ms │  263.52 / 280.61 ±9.84 / 293.81 ms │ no change │
│ QQuery 20    │  204.22 / 213.72 ±7.82 / 223.06 ms │  208.81 / 213.25 ±4.28 / 218.62 ms │ no change │
│ QQuery 21    │  545.87 / 557.55 ±6.38 / 564.89 ms │  544.30 / 550.89 ±3.40 / 554.04 ms │ no change │
│ QQuery 22    │     82.11 / 83.91 ±1.51 / 86.02 ms │     78.60 / 81.18 ±2.54 / 85.87 ms │ no change │
└──────────────┴────────────────────────────────────┴────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary      ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)      │ 6480.54ms │
│ Total Time (dev)       │ 6476.83ms │
│ Average Time (main)    │  294.57ms │
│ Average Time (dev)     │  294.40ms │
│ Queries Faster         │         0 │
│ Queries Slower         │         0 │
│ Queries with No Change │        22 │
│ Queries with Failure   │         0 │
└────────────────────────┴───────────┘

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added documentation Improvements or additions to documentation sql SQL Planner core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) functions Changes to functions implementation labels Oct 4, 2025
@github-actions github-actions bot removed documentation Improvements or additions to documentation sql SQL Planner core Core DataFusion crate labels Oct 4, 2025
@duongcongtoai duongcongtoai changed the title Opt array agg feat: implement GroupArrayAggAccumulator Oct 4, 2025
@duongcongtoai duongcongtoai changed the title feat: implement GroupArrayAggAccumulator feat: implement GroupArrayAggAccumulator attempt 3 Oct 4, 2025
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label Oct 4, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very very cool @duongcongtoai

stacked_batches: Vec<ArrayRef>,
// address items of each group within the stacked_batches
// this is maintained to perform kernel::interleave
stacked_group_indices: Vec<Vec<(usize, usize)>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written I think this effectively has an extra allocation per group

If you saved the internal state using using a single Vec<usize> that represents the group for each element in the ArrayRefs, you could then sort it (remembering the original index as well) to determine the final arguments to interleave

opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// TODO: all the reference to this function always result into this opt_filter as none
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_opt is passed for a query like SELECT ARRAY_AGG(x FILTER y > 5)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then i think we should add a testcase for it

Copy link
Contributor Author

@duongcongtoai duongcongtoai Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually for merge_batch i think the opt_filter will always be None, for example the implementation of
median:

// Since aggregate filter should be applied in partial stage, in final stage there should be no filter

count:
_opt_filter: Option<&BooleanArray>,

@duongcongtoai
Copy link
Contributor Author

some error on memory reservation, i'm not sure if the calculation if fn size is wrong, but as i understand, we only account for the buffer newly created by the implementation, not the buffer we borrowed somewhere (i.e the stacked ArrayRef everytime we receive from fn merge_batch or fn update_batch.
Maybe it's a good chance for me learn how mem reservation/spilling works

}

fn size(&self) -> usize {
size_of_val(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some error on memory reservation, i'm not sure if the calculation if fn size is wrong, but as i understand, we only account for the buffer newly created by the implementation, not the buffer we borrowed somewhere (i.e the stacked ArrayRef everytime we receive from fn merge_batch or fn update_batch. Maybe it's a good chance for me learn how mem reservation/spilling works

I took a quick look at this code -- One thing we probably need to account for is the memory in the held ArrayRefs -- specifically by calling https://docs.rs/arrow/latest/arrow/array/trait.Array.html#tymethod.get_array_memory_size on all the stacked arrays

However, I bet the fuzz test failure is due to actually better accounting of memory. Maybe we need to readjust the parameters or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the stacked arrays size will grow overtime, accounting their size will likely make this implementation infeasible under memory pressure. I wonder, should we introduce an option for user to either use the old implementation of GroupAccumulatorAdaptor vs this implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually after reducing batch_size of the fuzz test it passed, this happens during spill merge. But interesting the size that makes the merge overflow is the size of the output (not the size of the accumulator), then how the new implementation cause the output to use more mem 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        for batch in batches {
            in_progress_file.append_batch(&batch)?;

            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
        }

max_record_batch_size decides the allocation during the spill merge, maybe this is the difference

@github-actions github-actions bot added the core Core DataFusion crate label Oct 8, 2025
@duongcongtoai
Copy link
Contributor Author

toai@salamancabrothehood:~/proj/rust/playpy$ bash test.sh sample-1m.parquet 
Benchmark 1: uv run polar.py sample-1m.parquet
  Time (mean ± σ):     594.7 ms ±   9.0 ms    [User: 1190.6 ms, System: 614.0 ms]
  Range (min … max):   582.7 ms … 607.2 ms    10 runs
 
Benchmark 2: uv run df.py sample-1m.parquet
  Time (mean ± σ):     786.1 ms ±  10.4 ms    [User: 3790.5 ms, System: 569.3 ms]
  Range (min … max):   767.9 ms … 805.6 ms    10 runs
 
Summary
  uv run polar.py sample-1m.parquet ran
    1.32 ± 0.03 times faster than uv run df.py sample-1m.parquet

I tried benchmark again with this branch, the result is improved compared to datafusion 50.0.0, however i have to modify the original script a bit, because writing to parquet fails in the same way described in this issue

I'll also take a look on that later

    df = df.aggregate(
        [col("df.name"), col("df.group")],
        array_agg(col("market")).alias("markets"),
    )
    df = df.select(
        col("df.name"),
        col("df.group"),
        when(col("markets")[1].is_not_null(), col("markets")).otherwise(lit(None)),
    )

    df.collect();

    # df.write_parquet("output_datafusion.parquet")

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate functions Changes to functions implementation sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Slow aggregrate query with array_agg, Polars is 4 times faster for equal query

2 participants