Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion not using all cores on a TPCH like query during query with repartiton #6290

Closed
alamb opened this issue May 8, 2023 · 3 comments · Fixed by #6310
Closed

Datafusion not using all cores on a TPCH like query during query with repartiton #6290

alamb opened this issue May 8, 2023 · 3 comments · Fixed by #6310
Assignees
Labels
bug Something isn't working performance Make DataFusion faster

Comments

@alamb
Copy link
Contributor

alamb commented May 8, 2023

Describe the bug

DataFusion is not taking advantage of all cores on my machine despite the plan having a repartition

I ran this at 2e9beeb on main while working on #6278

To Reproduce

-- step 1: save this as script.sql in arrow-datafusion checkout
--
-- step 2: generate data:
--  (cd benchmarks && ./bench.sh data all)
--
-- step 3: run this script:
-- datafusion-cli -f script.sql
--
-- Expected: all cores are kept busy processing the query
-- Actual: only one core seems to be busy

-- load the data from lineitem a few times
create table lineitem as select * from 'benchmarks/data/lineitem';

insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;
insert into lineitem select * from lineitem;


-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | plan_type     | plan                                                                                                                                                                                                                                                                                            |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- | logical_plan  | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST                                                                                                                                                                                                                |
-- |               |   Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order                                          |
-- |               |     Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]]                                                                                  |
-- |               |       Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_returnflag, lineitem.l_linestatus                                                                                                                                                              |
-- |               |         Filter: lineitem.l_shipdate <= Date32("10471")                                                                                                                                                                                                                                          |
-- |               |           TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_returnflag, l_linestatus, l_shipdate]                                                                                                                                                                      |
-- | physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                          |
-- |               |   SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                  |
-- |               |     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, AVG(lineitem.l_quantity)@3 as avg_qty, AVG(lineitem.l_extendedprice)@4 as avg_price, AVG(lineitem.l_discount)@5 as avg_disc, COUNT(UInt8(1))@6 as count_order] |
-- |               |       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                           |
-- |               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                             |
-- |               |           RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16                                                                                                                                   |
-- |               |             AggregateExec: mode=Partial, gby=[l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                              |
-- |               |               ProjectionExec: expr=[l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus]                                                                                               |
-- |               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                     |
-- |               |                   FilterExec: l_shipdate@5 <= 10471                                                                                                                                                                                                                                             |
-- |               |                     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                       |
-- |               |                       MemoryExec: partitions=1, partition_sizes=[11728]                                                                                                                                                                                                                         |
-- |               |                                                                                                                                                                                                                                                                                                 |
-- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

EXPLAIN select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

-- Run the actual query:
--
-- 0 rows in set. Query took 1.570 seconds.
-- 0 rows in set. Query took 0.004 seconds.
-- 0 rows in set. Query took 0.005 seconds.
-- 0 rows in set. Query took 0.009 seconds.
-- 0 rows in set. Query took 0.023 seconds.
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | l_returnflag | l_linestatus | sum_qty       | avg_qty   | avg_price    | avg_disc | count_order |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- | A            | F            | 603745712.00  | 25.522005 | 38273.129734 | 0.049985 | 23655888    |
-- | N            | F            | 15862672.00   | 25.516471 | 38284.467760 | 0.050093 | 621664      |
-- | N            | O            | 1191616640.00 | 25.502226 | 38249.117988 | 0.049996 | 46725984    |
-- | R            | F            | 603516048.00  | 25.505793 | 38250.854626 | 0.050009 | 23661920    |
-- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
-- 4 rows in set. Query took 9.980 seconds.

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

Expected behavior

I expect during the query that all the cores on my machine to be in use, but instead only a single core is used.

Additional context

No response

@alamb alamb added the bug Something isn't working label May 8, 2023
@alamb alamb changed the title Datafusion not using all cores during query with repartiton Datafusion not using all cores on a TPCH like query during query with repartiton May 8, 2023
@alamb alamb added the performance Make DataFusion faster label May 8, 2023
@alamb
Copy link
Contributor Author

alamb commented May 8, 2023

@crepererum do you have any insights / thoughts on this (as the one who most recently touched the repartition operator as I recall)

@alamb
Copy link
Contributor Author

alamb commented May 9, 2023

Update here is that I think the fact that the repartition is yielding on each batch effectively is serializing the processing -- I will have a PR up shortly

@alamb
Copy link
Contributor Author

alamb commented May 9, 2023

#6310

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working performance Make DataFusion faster
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant