forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 49
[kap] #15091, remove default database check when init sparkcontext #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
fishcus
merged 1 commit into
Kyligence:kyspark-2.2.1.x-3.x
from
fishcus:kyspark-ke-15091
Oct 17, 2019
Merged
[kap] #15091, remove default database check when init sparkcontext #67
fishcus
merged 1 commit into
Kyligence:kyspark-2.2.1.x-3.x
from
fishcus:kyspark-ke-15091
Oct 17, 2019
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
zheniantoushipashi
approved these changes
Oct 17, 2019
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
eventd
approved these changes
Oct 17, 2019
7mming7
pushed a commit
that referenced
this pull request
Nov 4, 2020
…applicable ### What changes were proposed in this pull request? Based on a follow up comment in apache#28123, where we can coalesce buckets for shuffled hash join as well. The note here is we only coalesce the buckets from shuffled hash join stream side (i.e. the side not building hash map), so we don't need to worry about OOM when coalescing multiple buckets in one task for building hash map. > If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. Refactor existing physical plan rule `CoalesceBucketsInSortMergeJoin` to `CoalesceBucketsInJoin`, for covering shuffled hash join as well. Refactor existing unit test `CoalesceBucketsInSortMergeJoinSuite` to `CoalesceBucketsInJoinSuite`, for covering shuffled hash join as well. ### Why are the changes needed? Avoid shuffle for joining different bucketed tables, is also useful for shuffled hash join. In production, we are seeing users to use shuffled hash join to join bucketed tables (set `spark.sql.join.preferSortMergeJoin`=false, to avoid sort), and this can help avoid shuffle if number of buckets are not same. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests in `CoalesceBucketsInJoinSuite` for verifying shuffled hash join physical plan. ### Performance number per request from maropu I was looking at TPCDS per suggestion from maropu. But I found most of queries from TPCDS are doing aggregate, and only several ones are doing join. None of input tables are bucketed. So I took the approach to test a modified version of `TPCDS q93` as ``` SELECT ss_ticket_number, sr_ticket_number FROM store_sales JOIN store_returns ON ss_ticket_number = sr_ticket_number ``` And make `store_sales` and `store_returns` to be bucketed tables. Physical query plan without coalesce: ``` ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft :- Exchange hashpartitioning(ss_ticket_number#109L, 4), true, [id=#67] : +- *(1) Project [ss_ticket_number#109L] : +- *(1) Filter isnotnull(ss_ticket_number#109L) : +- *(1) ColumnarToRow : +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2 +- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130] +- *(2) Filter isnotnull(sr_ticket_number#120L) +- *(2) ColumnarToRow +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 ``` Physical query plan with coalesce: ``` ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft :- *(1) Project [ss_ticket_number#109L] : +- *(1) Filter isnotnull(ss_ticket_number#109L) : +- *(1) ColumnarToRow : +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2 +- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130] +- *(2) Filter isnotnull(sr_ticket_number#120L) +- *(2) ColumnarToRow +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 (Coalesced to 2) ``` Run time improvement as 50% of wall clock time: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz shuffle hash join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ shuffle hash join coalesce bucket off 1541 1664 106 1.9 535.1 1.0X shuffle hash join coalesce bucket on 1060 1169 81 2.7 368.1 1.5X ``` Closes apache#29079 from c21/split-bucket. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
7mming7
pushed a commit
that referenced
this pull request
Nov 4, 2020
… more scenarios such as PartitioningCollection ### What changes were proposed in this pull request? This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios: 1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`. 2. Handle `PartitioningCollection`, which may contain `HashPartitioning` ### Why are the changes needed? 1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side. The following will not consider the right-side `HashPartitioning`: ``` val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2") df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2") val t1 = spark.table("t1") val t2 = spark.table("t2") val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2")) join.explain == Physical Plan == *(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0. +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. For the scenario 2), the current behavior does not handle `PartitioningCollection`: ``` val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2") val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3") val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3")) join2.explain == Physical Plan == *(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner :- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed : +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed : +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64] +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(7) LocalTableScan [_1#24, _2#25] ``` ### Does this PR introduce _any_ user-facing change? Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed: 1. Senario 1): ``` == Physical Plan == *(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0 +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. Scenario 2): ``` == Physical Plan == *(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner :- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58] +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(6) LocalTableScan [_1#24, _2#25] ``` ### How was this patch tested? Added tests. Closes apache#29074 from imback82/reorder_keys. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
7mming7
pushed a commit
to 7mming7/spark
that referenced
this pull request
Nov 17, 2020
### What changes were proposed in this pull request? Push down filter through expand. For case below: ``` create table t1(pid int, uid int, sid int, dt date, suid int) using parquet; create table t2(pid int, vs int, uid int, csid int) using parquet; SELECT years, appversion, SUM(uusers) AS users FROM (SELECT Date_trunc('year', dt) AS years, CASE WHEN h.pid = 3 THEN 'iOS' WHEN h.pid = 4 THEN 'Android' ELSE 'Other' END AS viewport, h.vs AS appversion, Count(DISTINCT u.uid) AS uusers ,Count(DISTINCT u.suid) AS srcusers FROM t1 u join t2 h ON h.uid = u.uid GROUP BY 1, 2, 3) AS a WHERE viewport = 'iOS' GROUP BY 1, 2 ``` Plan. before this pr: ``` == Physical Plan == *(5) HashAggregate(keys=[years#30, appversion#32], functions=[sum(uusers#33L)]) +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=Kyligence#251] +- *(4) HashAggregate(keys=[years#30, appversion#32], functions=[partial_sum(uusers#33L)]) +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) u.`uid`Kyligence#47 else null)]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=Kyligence#246] +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = 1)) u.`uid`Kyligence#47 else null)]) +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`Kyligence#47, u.`suid`Kyligence#48, gid#44], functions=[]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`Kyligence#47, u.`suid`Kyligence#48, gid#44, 200), true, [id=Kyligence#241] +- *(2) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`Kyligence#47, u.`suid`Kyligence#48, gid#44], functions=[]) +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS) +- *(2) Expand [ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))Kyligence#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`Kyligence#47, u.`suid`Kyligence#48, gid#44] +- *(2) Project [uid#7, dt#9, suid#10, pid#11, vs#12] +- *(2) BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight :- *(2) Project [uid#7, dt#9, suid#10] : +- *(2) Filter isnotnull(uid#7) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[uid#7,dt#9,suid#10] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date,suid:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), [id=Kyligence#233] +- *(1) Project [pid#11, vs#12, uid#13] +- *(1) Filter isnotnull(uid#13) +- *(1) ColumnarToRow +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [isnotnull(uid#13)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` Plan. after. this pr. : ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[years#0, appversion#2], functions=[sum(uusers#3L)], output=[years#0, appversion#2, users#5L]) +- Exchange hashpartitioning(years#0, appversion#2, 5), true, [id=Kyligence#71] +- HashAggregate(keys=[years#0, appversion#2], functions=[partial_sum(uusers#3L)], output=[years#0, appversion#2, sum#22L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[count(distinct uid#7)], output=[years#0, appversion#2, uusers#3L]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, 5), true, [id=Kyligence#67] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[partial_count(distinct uid#7)], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, count#27L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7, 5), true, [id=Kyligence#63] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles)) AS date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END AS CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))Kyligence#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Project [uid#7, dt#9, pid#11, vs#12] +- BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight, false :- Filter isnotnull(uid#7) : +- FileScan parquet default.t1[uid#7,dt#9] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=Kyligence#58] +- Filter ((CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS) AND isnotnull(uid#13)) +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [(CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS), isnotnull..., Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` ### Why are the changes needed? Improve performance, filter more data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes apache#30278 from AngersZhuuuu/SPARK-33302. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
lxian
pushed a commit
to lxian/spark
that referenced
this pull request
Mar 24, 2021
…join can be planned as broadcast join ### What changes were proposed in this pull request? Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. ```scala spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` ### Why are the changes needed? 1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance. 2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test. SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- q14a | 660 | 594 q14b | 660 | 600 q38 | 55 | 29 q87 | 66 | 35 Before this pr:  After this pr:  Closes apache#31145 from wangyum/SPARK-34081. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
lxian
pushed a commit
to lxian/spark
that referenced
this pull request
Sep 24, 2021
…query ### What changes were proposed in this pull request? Remove redundant aliases after `RewritePredicateSubquery`. For example: ```scala sql("CREATE TABLE t1 USING parquet AS SELECT id AS a, id AS b, id AS c FROM range(10)") sql("CREATE TABLE t2 USING parquet AS SELECT id AS x, id AS y FROM range(8)") sql( """ |SELECT * |FROM t1 |WHERE a IN (SELECT x | FROM (SELECT x AS x, | Rank() OVER (partition BY x ORDER BY Sum(y) DESC) AS ranking | FROM t2 | GROUP BY x) tmp1 | WHERE ranking <= 5) |""".stripMargin).explain ``` Before this PR: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [a#10L], [x#7L], LeftSemi, BuildRight, false :- FileScan parquet default.t1[a#10L,b#11L,c#12L] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=Kyligence#68] +- Project [x#7L] +- Filter (ranking#8 <= 5) +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST] +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#62] +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)]) +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#59] +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)]) +- FileScan parquet default.t2[x#15L,y#16L] ``` After this PR: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [a#10L], [x#15L], LeftSemi, BuildRight, false :- FileScan parquet default.t1[a#10L,b#11L,c#12L] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=Kyligence#67] +- Project [x#15L] +- Filter (ranking#8 <= 5) +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST] +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0 +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)]) +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#59] +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)]) +- FileScan parquet default.t2[x#15L,y#16L] ``` ### Why are the changes needed? Reduce shuffle to improve query performance. This change can benefit TPC-DS q70. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#33509 from wangyum/SPARK-36280. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
lxian
pushed a commit
to lxian/spark
that referenced
this pull request
Sep 24, 2021
…ence index for optimization ### What changes were proposed in this pull request? This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning. ```python import pyspark.pandas as ps ps.set_option('compute.default_index_type', 'distributed-sequence') ps.range(10).id.value_counts().to_frame().spark.explain() ``` **Before:** ```bash == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [count#51L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(count#51L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=Kyligence#70] +- HashAggregate(keys=[id#37L], functions=[count(1)], output=[__index_level_0__#48L, count#51L]) +- Exchange hashpartitioning(id#37L, 200), ENSURE_REQUIREMENTS, [id=Kyligence#67] +- HashAggregate(keys=[id#37L], functions=[partial_count(1)], output=[id#37L, count#63L]) +- Project [id#37L] +- Filter atleastnnonnulls(1, id#37L) +- Scan ExistingRDD[__index_level_0__#36L,id#37L] # ^^^ Base DataFrame created by the output RDD from zipWithIndex (and checkpointed) ``` **After:** ```bash == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [count#275L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(count#275L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=Kyligence#174] +- HashAggregate(keys=[id#258L], functions=[count(1)]) +- HashAggregate(keys=[id#258L], functions=[partial_count(1)]) +- Filter atleastnnonnulls(1, id#258L) +- Range (0, 10, step=1, splits=16) # ^^^ Removed the Spark job execution for `zipWithIndex` ``` ### Why are the changes needed? To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to `distributed-sequence`. Closes apache#33807 from HyukjinKwon/SPARK-36559. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
chenzhx
pushed a commit
to chenzhx/spark
that referenced
this pull request
Oct 20, 2021
…ence index for optimization ### What changes were proposed in this pull request? This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning. ```python import pyspark.pandas as ps ps.set_option('compute.default_index_type', 'distributed-sequence') ps.range(10).id.value_counts().to_frame().spark.explain() ``` **Before:** ```bash == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [count#51L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(count#51L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=Kyligence#70] +- HashAggregate(keys=[id#37L], functions=[count(1)], output=[__index_level_0__#48L, count#51L]) +- Exchange hashpartitioning(id#37L, 200), ENSURE_REQUIREMENTS, [id=Kyligence#67] +- HashAggregate(keys=[id#37L], functions=[partial_count(1)], output=[id#37L, count#63L]) +- Project [id#37L] +- Filter atleastnnonnulls(1, id#37L) +- Scan ExistingRDD[__index_level_0__#36L,id#37L] # ^^^ Base DataFrame created by the output RDD from zipWithIndex (and checkpointed) ``` **After:** ```bash == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [count#275L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(count#275L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=Kyligence#174] +- HashAggregate(keys=[id#258L], functions=[count(1)]) +- HashAggregate(keys=[id#258L], functions=[partial_count(1)]) +- Filter atleastnnonnulls(1, id#258L) +- Range (0, 10, step=1, splits=16) # ^^^ Removed the Spark job execution for `zipWithIndex` ``` ### Why are the changes needed? To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to `distributed-sequence`. Closes apache#33807 from HyukjinKwon/SPARK-36559. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 93cec49) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
remove default database check when init sparkcontext
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.