-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Closed
Labels
bugSomething isn't workingSomething isn't workingperformanceMake DataFusion fasterMake DataFusion faster
Description
Describe the bug
DataFusion is not taking advantage of all cores on my machine despite the plan having a repartition
I ran this at 2e9beeb on main while working on #6278
To Reproduce
-- step 1: save this as script.sql in arrow-datafusion checkout
--
-- step 2: generate data:
-- (cd benchmarks && ./bench.sh data all)
--
-- step 3: run this script:
-- datafusion-cli -f script.sql
--
-- Expected: all cores are kept busy processing the query
-- Actual: only one core seems to be busy
-- load the data from lineitem a few times
create table lineitem as select * from 'benchmarks/data/lineitem';
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | plan_type | plan |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | logical_plan | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST |
-- | | Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order |
-- | | Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]] |
-- | | Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_returnflag, lineitem.l_linestatus |
-- | | Filter: lineitem.l_shipdate <= Date32("10471") |
-- | | TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_returnflag, l_linestatus, l_shipdate] |
-- | physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] |
-- | | SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] |
-- | | ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, AVG(lineitem.l_quantity)@3 as avg_qty, AVG(lineitem.l_extendedprice)@4 as avg_price, AVG(lineitem.l_discount)@5 as avg_disc, COUNT(UInt8(1))@6 as count_order] |
-- | | AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] |
-- | | CoalesceBatchesExec: target_batch_size=8192 |
-- | | RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16 |
-- | | AggregateExec: mode=Partial, gby=[l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] |
-- | | ProjectionExec: expr=[l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus] |
-- | | CoalesceBatchesExec: target_batch_size=8192 |
-- | | FilterExec: l_shipdate@5 <= 10471 |
-- | | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 |
-- | | MemoryExec: partitions=1, partition_sizes=[11728] |
-- | | |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
EXPLAIN select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
-- Run the actual query:
--
-- 0 rows in set. Query took 1.570 seconds.
-- 0 rows in set. Query took 0.004 seconds.
-- 0 rows in set. Query took 0.005 seconds.
-- 0 rows in set. Query took 0.009 seconds.
-- 0 rows in set. Query took 0.023 seconds.
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | l_returnflag | l_linestatus | sum_qty | avg_qty | avg_price | avg_disc | count_order |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | A | F | 603745712.00 | 25.522005 | 38273.129734 | 0.049985 | 23655888 |
-- | N | F | 15862672.00 | 25.516471 | 38284.467760 | 0.050093 | 621664 |
-- | N | O | 1191616640.00 | 25.502226 | 38249.117988 | 0.049996 | 46725984 |
-- | R | F | 603516048.00 | 25.505793 | 38250.854626 | 0.050009 | 23661920 |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- 4 rows in set. Query took 9.980 seconds.
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
Expected behavior
I expect during the query that all the cores on my machine to be in use, but instead only a single core is used.
Additional context
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingperformanceMake DataFusion fasterMake DataFusion faster