- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.7k
Closed
Labels
enhancementNew feature or requestNew feature or requestperformanceMake DataFusion fasterMake DataFusion faster
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When running the benchmarks with DataFusion I noticed that we scan statistics for all tables early on (even tables not referenced in the query). This happens in ExecutionContext::register_table. We then scan statistics again later on for the tables that are actually used in the query.
../target/release/tpch benchmark datafusion   --path /mnt/bigdata/tpch-sf1000-parquet/   --format parquet   --iterations 1   --debug   --concurrency 24   --query 3
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 3, debug: true, iterations: 1, concurrency: 24, batch_size: 8192, path: "/mnt/bigdata/tpch-sf1000-parquet/", file_format: "parquet", mem_table: false, partitions: 8 }
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//part)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//supplier)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//partsupp)
Scanned 48 Parquet files for statistics in 1 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
Scanned 48 Parquet files for statistics in 4 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
Scanned 48 Parquet files for statistics in 30 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//nation)
Scanned 1 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//region)
Scanned 1 Parquet files for statistics in 0 seconds
=== Logical plan ===
Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
  Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
    Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
      Filter: #customer.c_mktsegment Eq Utf8("BUILDING") And #orders.o_orderdate Lt CAST(Utf8("1995-03-15") AS Date32) And #lineitem.l_shipdate Gt CAST(Utf8("1995-03-15") AS Date32)
        Join: #orders.o_orderkey = #lineitem.l_orderkey
          Join: #customer.c_custkey = #orders.o_custkey
            TableScan: customer projection=None
            TableScan: orders projection=None
          TableScan: lineitem projection=None
=== Optimized logical plan ===
Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
  Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
    Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
      Join: #orders.o_orderkey = #lineitem.l_orderkey
        Join: #customer.c_custkey = #orders.o_custkey
          Filter: #customer.c_mktsegment Eq Utf8("BUILDING")
            TableScan: customer projection=Some([0, 6]), filters=[#customer.c_mktsegment Eq Utf8("BUILDING")]
          Filter: #orders.o_orderdate Lt Date32("9204")
            TableScan: orders projection=Some([0, 1, 4, 7]), filters=[#orders.o_orderdate Lt Date32("9204")]
        Filter: #lineitem.l_shipdate Gt Date32("9204")
          TableScan: lineitem projection=Some([0, 5, 6, 10]), filters=[#lineitem.l_shipdate Gt Date32("9204")]
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
Scanned 48 Parquet files for statistics in 4 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
Scanned 48 Parquet files for statistics in 30 seconds
=== Physical plan ===
SortExec: [revenue@1 DESC,o_orderdate@2 ASC]
  CoalescePartitionsExec
    ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
      HashAggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
        CoalesceBatchesExec: target_batch_size=4096
          RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 24)
            HashAggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
              CoalesceBatchesExec: target_batch_size=4096
                HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
                  CoalesceBatchesExec: target_batch_size=4096
                    RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 24)
                      CoalesceBatchesExec: target_batch_size=4096
                        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
                          CoalesceBatchesExec: target_batch_size=4096
                            RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 24)
                              CoalesceBatchesExec: target_batch_size=4096
                                FilterExec: c_mktsegment@1 = BUILDING
                                  ParquetExec: batch_size=8192, limit=None, partitions=[...]
                          CoalesceBatchesExec: target_batch_size=4096
                            RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 24)
                              CoalesceBatchesExec: target_batch_size=4096
                                FilterExec: o_orderdate@2 < 9204
                                  ParquetExec: batch_size=8192, limit=None, partitions=[...]
                  CoalesceBatchesExec: target_batch_size=4096
                    RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 24)
                      CoalesceBatchesExec: target_batch_size=4096
                        FilterExec: l_shipdate@3 > 9204
                          ParquetExec: batch_size=8192, limit=None, partitions=[...]
Describe the solution you'd like
- We should only scan statistics for tables that are used in the query
- We should only scan statistics once
Describe alternatives you've considered
N/A
Additional context
N/A
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requestperformanceMake DataFusion fasterMake DataFusion faster