Description
Is your feature request related to a problem or challenge?
While looking at TPCH query performance for #6782 I noticed several times that CoalesceBatchesExec
takes non trivial amounts of time (like 5% of the overall query time)
Here is a specific examples
Create Data:
cd arrow-datafusion/benchmarks
./bench.sh data tpch10
Run query with datafusion-cli:
cd arrow-datafusion/benchmarks/data/tpch_sf10
datafusion-cli -c "select o_year, sum(case when nation = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share from ( select extract(year from o_orderdate) as o_year, l_extendedprice * (1 - l_discount) as volume, n2.n_name as nation from part, supplier, lineitem, orders, customer, nation n1, nation n2, region where p_partkey = l_partkey and s_suppkey = l_suppkey and l_orderkey = o_orderkey and o_custkey = c_custkey and c_nationkey = n1.n_nationkey and n1.n_regionkey = r_regionkey and r_name = 'AMERICA' and s_nationkey = n2.n_nationkey and o_orderdate between date '1995-01-01' and date '1996-12-31' and p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations group by o_year order by o_year;"
select o_year,
sum(case when nation = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share
from (
select
extract(year from o_orderdate) as o_year,
l_extendedprice * (1 - l_discount) as volume,
n2.n_name as nation
from
part,
supplier,
lineitem,
orders,
customer,
nation n1,
nation n2,
region
where
p_partkey = l_partkey
and s_suppkey = l_suppkey
and l_orderkey = o_orderkey
and o_custkey = c_custkey
and c_nationkey = n1.n_nationkey
and n1.n_regionkey = r_regionkey
and r_name = 'AMERICA'
and s_nationkey = n2.n_nationkey
and o_orderdate between date '1995-01-01' and date '1996-12-31' and p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations
group by
o_year
order by
o_year;"
Here is the full EXPLAIN ANLAYZE
output:
explan-analyze-q8.txt
A small subset shows there is a single CoalesceBatchesExec
that takes 3 seconds (elapsed_compute=3.066514072s
):
CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=59986052, elapsed_compute=3.066514072s] |
RepartitionExec: partitioning=Hash([l_partkey@1], 16), input_partitions=16, metrics=[fetch_time=16.321190026s, repart_time=10.382230411s, send_time=4.650058274s] |
ParquetExec: file_groups={16 groups: [[Users/alamb/Software/arrow-datafusion/benchmarks/data/tpch_sf10
I profiled the query and confirmed that CoalesceBatchExec
takes 5% of the overall time, as shown in this screen shot
In diagrams this looks like
┌────────────────────┐ Filter
│ │ ┌────────────────────┐ Coalesce
│ │ ─ ─ ─ ─ ─ ─ ▶ │ RecordBatch │ Batches
│ RecordBatch │ │ num_rows = 234 │─ ─ ─ ─ ─ ┐
│ num_rows = 8000 │ └────────────────────┘
│ │ │
│ │ ┌────────────────────┐
└────────────────────┘ │ │ │
┌────────────────────┐ ┌────────────────────┐ │ │
│ │ Filter │ │ │ │ │
│ │ │ RecordBatch │ ─ ─ ─ ─ ─ ▶│ │
│ RecordBatch │ ─ ─ ─ ─ ─ ─ ▶ │ num_rows = 500 │─ ─ ─ ─ ─ ┐ │ │
│ num_rows = 8000 │ │ │ │ RecordBatch │
│ │ │ │ └ ─ ─ ─ ─ ─▶│ num_rows = 8000 │
│ │ └────────────────────┘ │ │
└────────────────────┘ │ │
... ─ ─ ─ ─ ─ ▶│ │
... ... │ │ │
│ │
┌────────────────────┐ │ └────────────────────┘
│ │ ┌────────────────────┐
│ │ Filter │ │ │
│ RecordBatch │ │ RecordBatch │
│ num_rows = 8000 │ ─ ─ ─ ─ ─ ─ ▶ │ num_rows = 333 │─ ─ ─ ─ ─ ┘
│ │ │ │
│ │ └────────────────────┘
└────────────────────┘
FilterExec RepartitonExec copies the data
creates output batches with copies *again* to form final large
of the matching rows (calls take() RecordBatches
to make a copy)
Describe the solution you'd like
I think we can avoid this overhead by combining the behavior of CoalesceBatchesExec
into the operators that make small batches (FilterExec
, JoinExec
, and RepartitionExec
). Something like
┌────────────────────┐ Filter
│ │ ┌────────────────────┐ Filter output
│ │ ─ ─ ─ ─ ─ ─ ▶ │ mask │
│ RecordBatch │ │ (BooleanArray) │─ ─ ─ ─ ─ ┐
│ num_rows = 8000 │ └────────────────────┘
│ │ │
│ │ ┌────────────────────┐
└────────────────────┘ │ │ │
┌────────────────────┐ ┌────────────────────┐ │ │
│ │ Filter │ │ │ │ │
│ │ │ mask │ ─ ─ ─ ─ ─ ▶│ │
│ RecordBatch │ ─ ─ ─ ─ ─ ─ ▶ │ (BooleanArray) │─ ─ ─ ─ ─ ┐ │ │
│ num_rows = 8000 │ │ │ │ RecordBatch │
│ │ │ │ └ ─ ─ ─ ─ ─▶│ num_rows = 8000 │
│ │ └────────────────────┘ │ │
└────────────────────┘ │ │
... ─ ─ ─ ─ ─ ▶│ │
... ... │ │ │
│ │
┌────────────────────┐ │ └────────────────────┘
│ │ ┌────────────────────┐
│ │ Filter │ │ │
│ RecordBatch │ │ mask │
│ num_rows = 8000 │ ─ ─ ─ ─ ─ ─ ▶ │ (BooleanArray) │─ ─ ─ ─ ─ ┘
│ │ │ │
│ │ └────────────────────┘
└────────────────────┘
FilterExec et all The Exec then copies rows from
internally buffers RecordBatches and multiple input batches (based on the
filter results until enough rows are masks) into a single new output
ready RecordBatch avoiding the second data
copy
The idea would be to take the core coalesce logic from CoalesceBatchesExec
that calls concat_batches
And instead of creating new small record batches in FilterExec
, HashJoinExec
, and RepartitionExec
buffer the inputs until there are at least target_batch_size
rows available, and then call interleave
instead
Here is the code in CoalesceBatchesExec
that could be adapted:
https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/coalesce_batches.rs#L215-L283
Here is where FilterExec makes the potentially small record batches
https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/filter.rs#L294-L306
The same think would be done in RepartitionExec: https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/repartition/mod.rs#L193-L218
Describe alternatives you've considered
No response
Additional context
No response