Skip to content

Commit

Permalink
chore: change shuffle mode default from jvm to auto (#877)
Browse files Browse the repository at this point in the history
* change shuffle mode default from jvm to auto

* update expected plans for Spark 3.4

* update expected plans for Spark 3.5

* fix docs for default value

* update expected plans for Spark 4.0

* update expected plans for Spark 4.0

* update 3.4 plans
  • Loading branch information
andygrove authored Aug 28, 2024
1 parent f4400f5 commit fe35224
Show file tree
Hide file tree
Showing 806 changed files with 33,471 additions and 38,242 deletions.
4 changes: 2 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ object CometConf extends ShimCometConf {
"'native' is for native shuffle which has best performance in general. " +
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
"By default, this config is 'jvm'.")
"By default, this config is 'auto'.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("jvm")
.createWithDefault("auto")

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'auto'. | auto |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
: : +- CometBroadcastHashJoin (26)
: : :- CometFilter (12)
: : : +- CometHashAggregate (11)
: : : +- CometColumnarExchange (10)
: : : +- CometExchange (10)
: : : +- CometHashAggregate (9)
: : : +- CometProject (8)
: : : +- CometBroadcastHashJoin (7)
Expand All @@ -22,10 +22,10 @@
: : +- CometBroadcastExchange (25)
: : +- CometFilter (24)
: : +- CometHashAggregate (23)
: : +- CometColumnarExchange (22)
: : +- CometExchange (22)
: : +- CometHashAggregate (21)
: : +- CometHashAggregate (20)
: : +- CometColumnarExchange (19)
: : +- CometExchange (19)
: : +- CometHashAggregate (18)
: : +- CometProject (17)
: : +- CometBroadcastHashJoin (16)
Expand Down Expand Up @@ -86,9 +86,9 @@ Input [3]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3]
Keys [2]: [sr_customer_sk#1, sr_store_sk#2]
Functions [1]: [partial_sum(UnscaledValue(sr_return_amt#3))]

(10) CometColumnarExchange
(10) CometExchange
Input [3]: [sr_customer_sk#1, sr_store_sk#2, sum#8]
Arguments: hashpartitioning(sr_customer_sk#1, sr_store_sk#2, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=1]
Arguments: hashpartitioning(sr_customer_sk#1, sr_store_sk#2, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1]

(11) CometHashAggregate
Input [3]: [sr_customer_sk#1, sr_store_sk#2, sum#8]
Expand Down Expand Up @@ -128,9 +128,9 @@ Input [3]: [sr_customer_sk#12, sr_store_sk#13, sr_return_amt#14]
Keys [2]: [sr_customer_sk#12, sr_store_sk#13]
Functions [1]: [partial_sum(UnscaledValue(sr_return_amt#14))]

(19) CometColumnarExchange
(19) CometExchange
Input [3]: [sr_customer_sk#12, sr_store_sk#13, sum#18]
Arguments: hashpartitioning(sr_customer_sk#12, sr_store_sk#13, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=2]
Arguments: hashpartitioning(sr_customer_sk#12, sr_store_sk#13, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]

(20) CometHashAggregate
Input [3]: [sr_customer_sk#12, sr_store_sk#13, sum#18]
Expand All @@ -142,9 +142,9 @@ Input [2]: [ctr_store_sk#19, ctr_total_return#20]
Keys [1]: [ctr_store_sk#19]
Functions [1]: [partial_avg(ctr_total_return#20)]

(22) CometColumnarExchange
(22) CometExchange
Input [3]: [ctr_store_sk#19, sum#21, count#22]
Arguments: hashpartitioning(ctr_store_sk#19, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=3]
Arguments: hashpartitioning(ctr_store_sk#19, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3]

(23) CometHashAggregate
Input [3]: [ctr_store_sk#19, sum#21, count#22]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ WholeStageCodegen (1)
CometBroadcastHashJoin [ctr_customer_sk,ctr_store_sk,ctr_total_return,(avg(ctr_total_return) * 1.2),ctr_store_sk]
CometFilter [ctr_customer_sk,ctr_store_sk,ctr_total_return]
CometHashAggregate [ctr_customer_sk,ctr_store_sk,ctr_total_return,sr_customer_sk,sr_store_sk,sum,sum(UnscaledValue(sr_return_amt))]
CometColumnarExchange [sr_customer_sk,sr_store_sk] #1
CometExchange [sr_customer_sk,sr_store_sk] #1
CometHashAggregate [sr_customer_sk,sr_store_sk,sum,sr_return_amt]
CometProject [sr_customer_sk,sr_store_sk,sr_return_amt]
CometBroadcastHashJoin [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk,d_date_sk]
Expand All @@ -31,10 +31,10 @@ WholeStageCodegen (1)
CometBroadcastExchange [(avg(ctr_total_return) * 1.2),ctr_store_sk] #4
CometFilter [(avg(ctr_total_return) * 1.2),ctr_store_sk]
CometHashAggregate [(avg(ctr_total_return) * 1.2),ctr_store_sk,sum,count,avg(ctr_total_return)]
CometColumnarExchange [ctr_store_sk] #5
CometExchange [ctr_store_sk] #5
CometHashAggregate [ctr_store_sk,sum,count,ctr_total_return]
CometHashAggregate [ctr_store_sk,ctr_total_return,sr_customer_sk,sr_store_sk,sum,sum(UnscaledValue(sr_return_amt))]
CometColumnarExchange [sr_customer_sk,sr_store_sk] #6
CometExchange [sr_customer_sk,sr_store_sk] #6
CometHashAggregate [sr_customer_sk,sr_store_sk,sum,sr_return_amt]
CometProject [sr_customer_sk,sr_store_sk,sr_return_amt]
CometBroadcastHashJoin [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk,d_date_sk]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,49 @@
== Physical Plan ==
TakeOrderedAndProject (47)
+- * HashAggregate (46)
+- * ColumnarToRow (45)
+- CometColumnarExchange (44)
+- RowToColumnar (43)
+- * HashAggregate (42)
+- * Project (41)
+- * BroadcastHashJoin Inner BuildRight (40)
:- * Project (35)
: +- * BroadcastHashJoin Inner BuildRight (34)
: :- * Project (28)
: : +- * Filter (27)
: : +- * BroadcastHashJoin ExistenceJoin(exists#1) BuildRight (26)
: : :- * BroadcastHashJoin ExistenceJoin(exists#2) BuildRight (19)
: : : :- * ColumnarToRow (12)
: : : : +- CometBroadcastHashJoin (11)
: : : : :- CometFilter (2)
: : : : : +- CometScan parquet spark_catalog.default.customer (1)
: : : : +- CometBroadcastExchange (10)
: : : : +- CometProject (9)
: : : : +- CometBroadcastHashJoin (8)
: : : : :- CometScan parquet spark_catalog.default.store_sales (3)
: : : : +- CometBroadcastExchange (7)
: : : : +- CometProject (6)
: : : : +- CometFilter (5)
: : : : +- CometScan parquet spark_catalog.default.date_dim (4)
: : : +- BroadcastExchange (18)
: : : +- * ColumnarToRow (17)
: : : +- CometProject (16)
: : : +- CometBroadcastHashJoin (15)
: : : :- CometScan parquet spark_catalog.default.web_sales (13)
: : : +- ReusedExchange (14)
: : +- BroadcastExchange (25)
: : +- * ColumnarToRow (24)
: : +- CometProject (23)
: : +- CometBroadcastHashJoin (22)
: : :- CometScan parquet spark_catalog.default.catalog_sales (20)
: : +- ReusedExchange (21)
: +- BroadcastExchange (33)
: +- * ColumnarToRow (32)
: +- CometProject (31)
: +- CometFilter (30)
: +- CometScan parquet spark_catalog.default.customer_address (29)
+- BroadcastExchange (39)
+- * ColumnarToRow (38)
+- CometFilter (37)
+- CometScan parquet spark_catalog.default.customer_demographics (36)
TakeOrderedAndProject (45)
+- * HashAggregate (44)
+- Exchange (43)
+- * HashAggregate (42)
+- * Project (41)
+- * BroadcastHashJoin Inner BuildRight (40)
:- * Project (35)
: +- * BroadcastHashJoin Inner BuildRight (34)
: :- * Project (28)
: : +- * Filter (27)
: : +- * BroadcastHashJoin ExistenceJoin(exists#1) BuildRight (26)
: : :- * BroadcastHashJoin ExistenceJoin(exists#2) BuildRight (19)
: : : :- * ColumnarToRow (12)
: : : : +- CometBroadcastHashJoin (11)
: : : : :- CometFilter (2)
: : : : : +- CometScan parquet spark_catalog.default.customer (1)
: : : : +- CometBroadcastExchange (10)
: : : : +- CometProject (9)
: : : : +- CometBroadcastHashJoin (8)
: : : : :- CometScan parquet spark_catalog.default.store_sales (3)
: : : : +- CometBroadcastExchange (7)
: : : : +- CometProject (6)
: : : : +- CometFilter (5)
: : : : +- CometScan parquet spark_catalog.default.date_dim (4)
: : : +- BroadcastExchange (18)
: : : +- * ColumnarToRow (17)
: : : +- CometProject (16)
: : : +- CometBroadcastHashJoin (15)
: : : :- CometScan parquet spark_catalog.default.web_sales (13)
: : : +- ReusedExchange (14)
: : +- BroadcastExchange (25)
: : +- * ColumnarToRow (24)
: : +- CometProject (23)
: : +- CometBroadcastHashJoin (22)
: : :- CometScan parquet spark_catalog.default.catalog_sales (20)
: : +- ReusedExchange (21)
: +- BroadcastExchange (33)
: +- * ColumnarToRow (32)
: +- CometProject (31)
: +- CometFilter (30)
: +- CometScan parquet spark_catalog.default.customer_address (29)
+- BroadcastExchange (39)
+- * ColumnarToRow (38)
+- CometFilter (37)
+- CometScan parquet spark_catalog.default.customer_demographics (36)


(1) Scan parquet spark_catalog.default.customer
Expand Down Expand Up @@ -245,56 +243,50 @@ Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#31]
Results [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]

(43) RowToColumnar
(43) Exchange
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]
Arguments: hashpartitioning(cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, 5), ENSURE_REQUIREMENTS, [plan_id=5]

(44) CometColumnarExchange
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]
Arguments: hashpartitioning(cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5]

(45) ColumnarToRow [codegen id : 6]
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]

(46) HashAggregate [codegen id : 6]
(44) HashAggregate [codegen id : 6]
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]
Keys [8]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#33]
Results [14]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, count(1)#33 AS cnt1#34, cd_purchase_estimate#26, count(1)#33 AS cnt2#35, cd_credit_rating#27, count(1)#33 AS cnt3#36, cd_dep_count#28, count(1)#33 AS cnt4#37, cd_dep_employed_count#29, count(1)#33 AS cnt5#38, cd_dep_college_count#30, count(1)#33 AS cnt6#39]

(47) TakeOrderedAndProject
(45) TakeOrderedAndProject
Input [14]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cnt1#34, cd_purchase_estimate#26, cnt2#35, cd_credit_rating#27, cnt3#36, cd_dep_count#28, cnt4#37, cd_dep_employed_count#29, cnt5#38, cd_dep_college_count#30, cnt6#39]
Arguments: 100, [cd_gender#23 ASC NULLS FIRST, cd_marital_status#24 ASC NULLS FIRST, cd_education_status#25 ASC NULLS FIRST, cd_purchase_estimate#26 ASC NULLS FIRST, cd_credit_rating#27 ASC NULLS FIRST, cd_dep_count#28 ASC NULLS FIRST, cd_dep_employed_count#29 ASC NULLS FIRST, cd_dep_college_count#30 ASC NULLS FIRST], [cd_gender#23, cd_marital_status#24, cd_education_status#25, cnt1#34, cd_purchase_estimate#26, cnt2#35, cd_credit_rating#27, cnt3#36, cd_dep_count#28, cnt4#37, cd_dep_employed_count#29, cnt5#38, cd_dep_college_count#30, cnt6#39]

===== Subqueries =====

Subquery:1 Hosting operator id = 3 Hosting Expression = ss_sold_date_sk#7 IN dynamicpruning#8
BroadcastExchange (52)
+- * ColumnarToRow (51)
+- CometProject (50)
+- CometFilter (49)
+- CometScan parquet spark_catalog.default.date_dim (48)
BroadcastExchange (50)
+- * ColumnarToRow (49)
+- CometProject (48)
+- CometFilter (47)
+- CometScan parquet spark_catalog.default.date_dim (46)


(48) Scan parquet spark_catalog.default.date_dim
(46) Scan parquet spark_catalog.default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy), EqualTo(d_year,2002), GreaterThanOrEqual(d_moy,1), LessThanOrEqual(d_moy,4), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int,d_moy:int>

(49) CometFilter
(47) CometFilter
Input [3]: [d_date_sk#9, d_year#10, d_moy#11]
Condition : (((((isnotnull(d_year#10) AND isnotnull(d_moy#11)) AND (d_year#10 = 2002)) AND (d_moy#11 >= 1)) AND (d_moy#11 <= 4)) AND isnotnull(d_date_sk#9))

(50) CometProject
(48) CometProject
Input [3]: [d_date_sk#9, d_year#10, d_moy#11]
Arguments: [d_date_sk#9], [d_date_sk#9]

(51) ColumnarToRow [codegen id : 1]
(49) ColumnarToRow [codegen id : 1]
Input [1]: [d_date_sk#9]

(52) BroadcastExchange
(50) BroadcastExchange
Input [1]: [d_date_sk#9]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=6]

Expand Down
Loading

0 comments on commit fe35224

Please sign in to comment.