Closed
Description
Describe the bug
DataFusion is not taking advantage of all cores on my machine despite the plan having a repartition
I ran this at 2e9beeb on main while working on #6278
To Reproduce
-- step 1: save this as script.sql in arrow-datafusion checkout
--
-- step 2: generate data:
-- (cd benchmarks && ./bench.sh data all)
--
-- step 3: run this script:
-- datafusion-cli -f script.sql
--
-- Expected: all cores are kept busy processing the query
-- Actual: only one core seems to be busy
-- load the data from lineitem a few times
create table lineitem as select * from 'benchmarks/data/lineitem';
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | plan_type | plan |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | logical_plan | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST |
-- | | Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order |
-- | | Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]] |
-- | | Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_returnflag, lineitem.l_linestatus |
-- | | Filter: lineitem.l_shipdate <= Date32("10471") |
-- | | TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_returnflag, l_linestatus, l_shipdate] |
-- | physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] |
-- | | SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] |
-- | | ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, AVG(lineitem.l_quantity)@3 as avg_qty, AVG(lineitem.l_extendedprice)@4 as avg_price, AVG(lineitem.l_discount)@5 as avg_disc, COUNT(UInt8(1))@6 as count_order] |
-- | | AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] |
-- | | CoalesceBatchesExec: target_batch_size=8192 |
-- | | RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16 |
-- | | AggregateExec: mode=Partial, gby=[l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] |
-- | | ProjectionExec: expr=[l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus] |
-- | | CoalesceBatchesExec: target_batch_size=8192 |
-- | | FilterExec: l_shipdate@5 <= 10471 |
-- | | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 |
-- | | MemoryExec: partitions=1, partition_sizes=[11728] |
-- | | |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
EXPLAIN select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
-- Run the actual query:
--
-- 0 rows in set. Query took 1.570 seconds.
-- 0 rows in set. Query took 0.004 seconds.
-- 0 rows in set. Query took 0.005 seconds.
-- 0 rows in set. Query took 0.009 seconds.
-- 0 rows in set. Query took 0.023 seconds.
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | l_returnflag | l_linestatus | sum_qty | avg_qty | avg_price | avg_disc | count_order |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | A | F | 603745712.00 | 25.522005 | 38273.129734 | 0.049985 | 23655888 |
-- | N | F | 15862672.00 | 25.516471 | 38284.467760 | 0.050093 | 621664 |
-- | N | O | 1191616640.00 | 25.502226 | 38249.117988 | 0.049996 | 46725984 |
-- | R | F | 603516048.00 | 25.505793 | 38250.854626 | 0.050009 | 23661920 |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- 4 rows in set. Query took 9.980 seconds.
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
Expected behavior
I expect during the query that all the cores on my machine to be in use, but instead only a single core is used.
Additional context
No response