From f80495da8374a73409cbf61f3ca2fe371b89fc30 Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Wed, 8 Nov 2023 15:16:50 +0800 Subject: [PATCH] [fix](Nereids) ban right outer, right anti, full outer with bucket shuffle (#26529) if left bucket has no data, we do not generate left bucket instance. These join should reserve all right side data. But because left instance is not exists. So right data will be discard since no dest be set. We ban these join temporarily until we could generate all instance for left side in Coordinator. --- .../ChildrenPropertiesRegulator.java | 27 +++++++++++++++---- .../shape/query72.out | 5 ++-- .../shape/query72.out | 5 ++-- .../shape/query80.out | 3 ++- .../nereids_p0/join/test_outer_join.groovy | 26 ++++++++++++++++++ 5 files changed, 56 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 1174602da773d8..f0a2331105fa8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction; import org.apache.doris.nereids.trees.plans.AggMode; +import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; @@ -178,6 +179,12 @@ public Boolean visitPhysicalFilter(PhysicalFilter filter, Void c return true; } + private boolean couldNotRightBucketShuffleJoin(JoinType joinType) { + return joinType == JoinType.RIGHT_ANTI_JOIN + || joinType == JoinType.RIGHT_OUTER_JOIN + || joinType == JoinType.FULL_OUTER_JOIN; + } + @Override public Boolean visitPhysicalHashJoin(PhysicalHashJoin hashJoin, Void context) { @@ -207,12 +214,22 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin updatedForLeft = Optional.empty(); Optional updatedForRight = Optional.empty(); - if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL - && rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) { + if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { // check colocate join with scan - if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { - return true; - } + return true; + } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) { + // right anti, right outer, full outer join could not do bucket shuffle join + // TODO remove this after we refactor coordinator + updatedForLeft = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())); + updatedForRight = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); + } else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL + && rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) { updatedForRight = Optional.of(calAnotherSideRequired( ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec, (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), diff --git a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query72.out b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query72.out index 5d955223c399ba..ffbb06a508a591 100644 --- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query72.out +++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query72.out @@ -17,8 +17,9 @@ PhysicalResultSink ------------------------PhysicalProject --------------------------hashJoin[INNER_JOIN] hashCondition=((d1.d_week_seq = d2.d_week_seq))otherCondition=() ----------------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((catalog_returns.cr_item_sk = catalog_sales.cs_item_sk) and (catalog_returns.cr_order_number = catalog_sales.cs_order_number))otherCondition=() -------------------------------PhysicalProject ---------------------------------PhysicalOlapScan[catalog_returns] +------------------------------PhysicalDistribute +--------------------------------PhysicalProject +----------------------------------PhysicalOlapScan[catalog_returns] ------------------------------PhysicalDistribute --------------------------------PhysicalProject ----------------------------------hashJoin[INNER_JOIN] hashCondition=((item.i_item_sk = catalog_sales.cs_item_sk))otherCondition=() diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query72.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query72.out index 4f5be2c3f7e3fb..ead7385767361c 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query72.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query72.out @@ -18,8 +18,9 @@ PhysicalResultSink --------------------------hashJoin[INNER_JOIN] hashCondition=((d1.d_week_seq = d2.d_week_seq))otherCondition=() ----------------------------PhysicalDistribute ------------------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((catalog_returns.cr_item_sk = catalog_sales.cs_item_sk) and (catalog_returns.cr_order_number = catalog_sales.cs_order_number))otherCondition=() ---------------------------------PhysicalProject -----------------------------------PhysicalOlapScan[catalog_returns] +--------------------------------PhysicalDistribute +----------------------------------PhysicalProject +------------------------------------PhysicalOlapScan[catalog_returns] --------------------------------PhysicalDistribute ----------------------------------PhysicalProject ------------------------------------hashJoin[LEFT_OUTER_JOIN] hashCondition=((catalog_sales.cs_promo_sk = promotion.p_promo_sk))otherCondition=() diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query80.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query80.out index f25f72e178651e..1c24205ee4d665 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query80.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query80.out @@ -62,7 +62,8 @@ PhysicalResultSink --------------------------hashAgg[LOCAL] ----------------------------PhysicalProject ------------------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((web_sales.ws_item_sk = web_returns.wr_item_sk) and (web_sales.ws_order_number = web_returns.wr_order_number))otherCondition=() ---------------------------------PhysicalOlapScan[web_returns] +--------------------------------PhysicalDistribute +----------------------------------PhysicalOlapScan[web_returns] --------------------------------PhysicalDistribute ----------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_web_site_sk = web_site.web_site_sk))otherCondition=() ------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk))otherCondition=() diff --git a/regression-test/suites/nereids_p0/join/test_outer_join.groovy b/regression-test/suites/nereids_p0/join/test_outer_join.groovy index 3dc132d08efd05..562326175e9af4 100644 --- a/regression-test/suites/nereids_p0/join/test_outer_join.groovy +++ b/regression-test/suites/nereids_p0/join/test_outer_join.groovy @@ -20,6 +20,7 @@ suite("test_outer_join", "nereids_p0") { sql "SET enable_fallback_to_original_planner=false" def tbl1 = "test_outer_join1" def tbl2 = "test_outer_join2" + def tbl3 = "test_outer_join3" sql "DROP TABLE IF EXISTS ${tbl1}" sql """ @@ -37,6 +38,15 @@ suite("test_outer_join", "nereids_p0") { DISTRIBUTED BY RANDOM BUCKETS 30 PROPERTIES ("replication_num" = "1"); """ + + sql "DROP TABLE IF EXISTS ${tbl3}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbl3} ( + c0 DECIMALV3(8,3) + ) + DISTRIBUTED BY HASH (c0) BUCKETS 1 PROPERTIES ("replication_num" = "1"); + """ + sql """INSERT INTO ${tbl2} (c0) VALUES ('dr'), ('x7Tq'), ('');""" sql """INSERT INTO ${tbl1} (c0) VALUES (0.47683432698249817), (0.8864791393280029);""" sql """INSERT INTO ${tbl1} (c0) VALUES (0.11287713050842285);""" @@ -56,6 +66,22 @@ suite("test_outer_join", "nereids_p0") { qt_join """ SELECT * FROM ${tbl2} LEFT OUTER JOIN ${tbl1} ON (('') like ('15DScmSM')) WHERE ('abc' NOT LIKE 'abc'); """ + + sql "set disable_join_reorder=true" + explain { + sql "SELECT * FROM ${tbl1} RIGHT OUTER JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0" + contains "RIGHT OUTER JOIN(PARTITIONED)" + } + explain { + sql "SELECT * FROM ${tbl1} RIGHT ANTI JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0" + contains "RIGHT ANTI JOIN(PARTITIONED)" + } + explain { + sql "SELECT * FROM ${tbl1} FULL OUTER JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0" + contains "FULL OUTER JOIN(PARTITIONED)" + } + sql "DROP TABLE IF EXISTS ${tbl1}" sql "DROP TABLE IF EXISTS ${tbl2}" + sql "DROP TABLE IF EXISTS ${tbl3}" }