Skip to content

Avoid extra copies in CoalesceBatchesExec to improve performance #7957

Open
@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

While looking at TPCH query performance for #6782 I noticed several times that CoalesceBatchesExec takes non trivial amounts of time (like 5% of the overall query time)

Here is a specific examples

Create Data:

cd arrow-datafusion/benchmarks
./bench.sh  data tpch10

Run query with datafusion-cli:

cd arrow-datafusion/benchmarks/data/tpch_sf10
datafusion-cli -c "select	o_year,	sum(case	when nation = 'BRAZIL' then volume	else 0	end) / sum(volume) as mkt_share	from	(	select	extract(year from o_orderdate) as o_year,	l_extendedprice * (1 - l_discount) as volume,	n2.n_name as nation	from	part,	supplier,	lineitem,	orders,	customer,	nation n1,	nation n2,	region	where	p_partkey = l_partkey	and s_suppkey = l_suppkey	and l_orderkey = o_orderkey	and o_custkey = c_custkey	and c_nationkey = n1.n_nationkey	and n1.n_regionkey = r_regionkey	and r_name = 'AMERICA'	and s_nationkey = n2.n_nationkey	and o_orderdate between date '1995-01-01' and date '1996-12-31'	and p_type = 'ECONOMY ANODIZED STEEL'	) as all_nations	group by	o_year	order by	o_year;"
select	o_year,
  sum(case	when nation = 'BRAZIL' then volume	else 0	end) / sum(volume) as mkt_share
from (
  select
    extract(year from o_orderdate) as o_year,
    l_extendedprice * (1 - l_discount) as volume,
    n2.n_name as nation
  from
    part,
    supplier,
    lineitem,
    orders,
    customer,
    nation n1,
    nation n2,
    region
  where
    p_partkey = l_partkey
    and s_suppkey = l_suppkey
    and l_orderkey = o_orderkey
    and o_custkey = c_custkey
    and c_nationkey = n1.n_nationkey
    and n1.n_regionkey = r_regionkey
    and r_name = 'AMERICA'
    and s_nationkey = n2.n_nationkey
    and o_orderdate between date '1995-01-01' and date '1996-12-31'	and p_type = 'ECONOMY ANODIZED STEEL'	) as all_nations
  group by
 	o_year
  order by
  	o_year;"

Here is the full EXPLAIN ANLAYZE output:
explan-analyze-q8.txt

A small subset shows there is a single CoalesceBatchesExec that takes 3 seconds (elapsed_compute=3.066514072s):

CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=59986052, elapsed_compute=3.066514072s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
  RepartitionExec: partitioning=Hash([l_partkey@1], 16), input_partitions=16, metrics=[fetch_time=16.321190026s, repart_time=10.382230411s, send_time=4.650058274s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
     ParquetExec: file_groups={16 groups: [[Users/alamb/Software/arrow-datafusion/benchmarks/data/tpch_sf10

I profiled the query and confirmed that CoalesceBatchExec takes 5% of the overall time, as shown in this screen shot

Screenshot 2023-10-27 at 7 30 00 AM

In diagrams this looks like

┌────────────────────┐        Filter                                                                          
│                    │                    ┌────────────────────┐            Coalesce                          
│                    │    ─ ─ ─ ─ ─ ─ ▶   │    RecordBatch     │             Batches                          
│    RecordBatch     │                    │   num_rows = 234   │─ ─ ─ ─ ─ ┐                                   
│  num_rows = 8000   │                    └────────────────────┘                                              
│                    │                                                    │                                   
│                    │                                                                ┌────────────────────┐  
└────────────────────┘                                                    │           │                    │  
┌────────────────────┐                    ┌────────────────────┐                      │                    │  
│                    │        Filter      │                    │          │           │                    │  
│                    │                    │    RecordBatch     │           ─ ─ ─ ─ ─ ▶│                    │  
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   num_rows = 500   │─ ─ ─ ─ ─ ┐           │                    │  
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │  
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │  
│                    │                    └────────────────────┘                      │                    │  
└────────────────────┘                                                                │                    │  
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │  
          ...                   ...                                       │           │                    │  
                                                                                      │                    │  
┌────────────────────┐                                                    │           └────────────────────┘  
│                    │                    ┌────────────────────┐                                              
│                    │       Filter       │                    │          │                                   
│    RecordBatch     │                    │    RecordBatch     │                                              
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   num_rows = 333   │─ ─ ─ ─ ─ ┘                                   
│                    │                    │                    │                                              
│                    │                    └────────────────────┘                                              
└────────────────────┘                                                                                        
                                                                                                              
                      FilterExec                                          RepartitonExec copies the data      
                      creates output batches with copies                  *again* to form final large         
                      of  the matching rows (calls take()                 RecordBatches                       
                      to make a copy)                                                                         
                                                                                                              

Describe the solution you'd like

I think we can avoid this overhead by combining the behavior of CoalesceBatchesExec into the operators that make small batches (FilterExec, JoinExec, and RepartitionExec). Something like

┌────────────────────┐        Filter                                                                        
│                    │                    ┌────────────────────┐          Filter output                     
│                    │    ─ ─ ─ ─ ─ ─ ▶   │        mask        │                                            
│    RecordBatch     │                    │   (BooleanArray)   │─ ─ ─ ─ ─ ┐                                 
│  num_rows = 8000   │                    └────────────────────┘                                            
│                    │                                                    │                                 
│                    │                                                                ┌────────────────────┐
└────────────────────┘                                                    │           │                    │
┌────────────────────┐                    ┌────────────────────┐                      │                    │
│                    │        Filter      │                    │          │           │                    │
│                    │                    │        mask        │           ─ ─ ─ ─ ─ ▶│                    │
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   (BooleanArray)   │─ ─ ─ ─ ─ ┐           │                    │
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │
│                    │                    └────────────────────┘                      │                    │
└────────────────────┘                                                                │                    │
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │
          ...                   ...                                       │           │                    │
                                                                                      │                    │
┌────────────────────┐                                                    │           └────────────────────┘
│                    │                    ┌────────────────────┐                                            
│                    │       Filter       │                    │          │                                 
│    RecordBatch     │                    │        mask        │                                            
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   (BooleanArray)   │─ ─ ─ ─ ─ ┘                                 
│                    │                    │                    │                                            
│                    │                    └────────────────────┘                                            
└────────────────────┘                                                                                      
                                                                                                            
                      FilterExec et all                             The Exec then copies rows from          
                      internally buffers RecordBatches and          multiple input batches (based on the    
                      filter results until enough rows are          masks) into a single new output         
                      ready                                         RecordBatch avoiding the second data    
                                                                    copy                                    

The idea would be to take the core coalesce logic from CoalesceBatchesExec that calls concat_batches And instead of creating new small record batches in FilterExec, HashJoinExec, and RepartitionExec buffer the inputs until there are at least target_batch_size rows available, and then call interleave instead

Here is the code in CoalesceBatchesExec that could be adapted:
https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/coalesce_batches.rs#L215-L283

Here is where FilterExec makes the potentially small record batches
https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/filter.rs#L294-L306

The same think would be done in RepartitionExec: https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/repartition/mod.rs#L193-L218

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions