- 
                Notifications
    You must be signed in to change notification settings 
- Fork 247
Description
Describe the bug
I am running a modified version of TPC-H query 10. I've removed the filters to stress comet and see how it behaves when processing large amount of data:
-- SQLBench-H query 10 derived from TPC-H query 10 under the terms of the TPC Fair Use Policy.
-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council.
select
	c_custkey,
	c_name,
	sum(l_extendedprice * (1 - l_discount)) as revenue,
	c_acctbal,
	n_name,
	c_address,
	c_phone,
	c_comment
from
	customer,
	orders,
	lineitem,
	nation
where
	c_custkey = o_custkey
	and l_orderkey = o_orderkey
	and c_nationkey = n_nationkey
group by
	c_custkey,
	c_name,
	c_acctbal,
	c_phone,
	n_name,
	c_address,
	c_comment
order by
	revenue desc limit 20;The Spark SQL metrics page showed that the CometSortExec operators return 100 rows and 0 row, and no spill is triggered. The SMJ node returns 0 rows, which is certainly not the case.
I've enabled spark.comet.explain.native.enabled and saw that the native execution plan with metrics showed somewhat reasonable numbers:
24/10/08 13:34:59 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (stage: 13 task: 42):
AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum], metrics=[output_rows=2553986, elapsed_compute=3.777709066s]
  ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=14996536, elapsed_compute=2.170829ms]
    ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], metrics=[output_rows=14996536, elapsed_compute=2.538593ms]
      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)], metrics=[output_rows=14996536, input_batches=1831, build_input_rows=25, build_input_batches=1, output_batches=1831, input_rows=14996536, build_mem_used=1392, build_time=53.125µs, join_time=856.613437ms]
        CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, elapsed_compute=5.209µs]
          ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=1.168µs, cast_time=1ns]
        CopyExec [UnpackOrClone], metrics=[output_rows=14996536, elapsed_compute=2.237373ms]
          ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=14996536, elapsed_compute=2.037619ms]
            SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], metrics=[output_rows=14996536, spill_count=0, spilled_bytes=0, spilled_rows=0, input_batches=2289, input_rows=18746334, output_batches=1831, peak_mem_used=918320, join_time=4.79088405s]
              SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], metrics=[output_rows=3749798, elapsed_compute=1.841784352s, spill_count=3, spilled_bytes=586947488, spilled_rows=3144844]
                CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3749798, elapsed_compute=66.035032ms]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3749798, elapsed_compute=411.397µs, cast_time=1ns]
              SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], metrics=[output_rows=14996536, elapsed_compute=2.318524006s, spill_count=4, spilled_bytes=590603456, spilled_rows=14752112]
                CopyExec [UnpackOrDeepCopy], metrics=[output_rows=14996536, elapsed_compute=32.209479ms]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], metrics=[output_rows=14996536, elapsed_compute=472.861µs, cast_time=1ns]
The SortExec operator in one of the tasks produced 15 million rows, and spilled 3~4 times.
Steps to reproduce
Run the SQL query mentioned above on TPC-H data with scale factor = 10.
Expected behavior
The metrics shown on the SQL page should be consistent with the native datafusion metrics.
Additional context
This issue is produced using Spark 3.5 with master=local[4], the version of comet is a slightly modified version of 3413397
Here are relevant spark configurations:
spark.sql.extensions   org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled  true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
spark.comet.exec.shuffle.enabled true
spark.shuffle.manager  org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.mode native
spark.sql.adaptive.enabled false
spark.sql.adaptive.coalescePartitions.enabled false
spark.sql.shuffle.partitions 4
