diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/OutputPropertyDeriver.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/OutputPropertyDeriver.java index 0d329e956d3836..e29b0b3ee5bc51 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/OutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/OutputPropertyDeriver.java @@ -228,10 +228,10 @@ private PhysicalPropertySet visitPhysicalJoin(PhysicalJoinOperator node, Express } else if ((leftDistributionDesc.isShuffle() || leftDistributionDesc.isShuffleEnforce()) && (rightDistributionDesc.isShuffle()) || rightDistributionDesc.isShuffleEnforce()) { // shuffle join - return computeHashJoinDistributionPropertyInfo(node, - computeShuffleJoinOutputProperty(node.getJoinType(), leftOnPredicateColumns, rightOnPredicateColumns), - leftOnPredicateColumns, - rightOnPredicateColumns, context); + PhysicalPropertySet outputProperty = computeShuffleJoinOutputProperty(node.getJoinType(), + leftDistributionDesc.getColumns(), rightDistributionDesc.getColumns()); + return computeHashJoinDistributionPropertyInfo(node, outputProperty, + leftOnPredicateColumns, rightOnPredicateColumns, context); } else { LOG.error("Children output property distribution error.left child property: {}, " + "right child property: {}, join node: {}", @@ -247,8 +247,8 @@ private PhysicalPropertySet visitPhysicalJoin(PhysicalJoinOperator node, Express } private PhysicalPropertySet computeShuffleJoinOutputProperty(JoinOperator joinType, - List leftOnPredicateColumns, - List rightOnPredicateColumns) { + List leftShuffleColumns, + List rightShuffleColumns) { Optional requiredShuffleDesc = getRequiredShuffleDesc(); if (!requiredShuffleDesc.isPresent()) { return PhysicalPropertySet.EMPTY; @@ -256,7 +256,7 @@ private PhysicalPropertySet computeShuffleJoinOutputProperty(JoinOperator joinTy // Get required properties for children. List requiredProperties = - computeShuffleJoinRequiredProperties(requirements, leftOnPredicateColumns, rightOnPredicateColumns); + computeShuffleJoinRequiredProperties(requirements, leftShuffleColumns, rightShuffleColumns); Preconditions.checkState(requiredProperties.size() == 2); // when it's a right join, we should use right input cols to derive the output property diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/plan/JoinTest.java b/fe/fe-core/src/test/java/com/starrocks/sql/plan/JoinTest.java index 8e1cd5b0b69542..0271297d4c1eb5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/sql/plan/JoinTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/sql/plan/JoinTest.java @@ -2693,4 +2693,37 @@ public void testSmallestColInJoin() throws Exception { " 0:OlapScanNode\n" + " TABLE: t0"); } + + @Test + public void testShuffleAgg() throws Exception { + String sql = "select j0.* \n" + + " from t6 j0 join[shuffle] t6 j1 on j0.v2 = j1.v2 and j0.v3 = j1.v3\n" + + " join[shuffle] t6 j2 on j0.v2 = j2.v2 and j0.v3 = j2.v3 and j0.v4 = j2.v4\n" + + " join[shuffle] (select v4,v2,v3 from t6 group by v4,v2,v3) j4 " + + " on j0.v2 =j4.v2 and j0.v3=j4.v3 and j0.v4 = j4.v4;\n" + + "\n"; + + connectContext.getSessionVariable().setNewPlanerAggStage(2); + String plan = getFragmentPlan(sql); + connectContext.getSessionVariable().setNewPlanerAggStage(0); + + assertContains(plan, " 15:HASH JOIN\n" + + " | join op: INNER JOIN (BUCKET_SHUFFLE(S))\n" + + " | colocate: false, reason: \n" + + " | equal join conjunct: 2: v2 = 14: v2\n" + + " | equal join conjunct: 3: v3 = 15: v3\n" + + " | equal join conjunct: 4: v4 = 16: v4\n" + + " | \n" + + " |----14:EXCHANGE\n" + + " | \n" + + " 9:Project"); + assertContains(plan, " PARTITION: HASH_PARTITIONED: 14: v2, 15: v3, 16: v4\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 14\n" + + " HASH_PARTITIONED: 14: v2, 15: v3\n" + + "\n" + + " 13:AGGREGATE (merge finalize)\n" + + " | group by: 16: v4, 14: v2, 15: v3"); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java b/fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java index fafd0057647fc4..7a53da3aa2318a 100644 --- a/fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java +++ b/fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java @@ -167,6 +167,20 @@ public static void beforeClass() throws Exception { "\"storage_format\" = \"DEFAULT\"\n" + ");"); + starRocksAssert.withTable("CREATE TABLE `t6` (\n" + + " `v1` bigint NULL COMMENT \"\",\n" + + " `v2` bigint NULL COMMENT \"\",\n" + + " `v3` bigint NULL COMMENT \"\",\n" + + " `v4` bigint NULL\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`v1`, `v2`, v3)\n" + + "DISTRIBUTED BY HASH(`v1`) BUCKETS 3\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"DEFAULT\"\n" + + ");"); + starRocksAssert.withTable("CREATE TABLE `colocate_t0` (\n" + " `v1` bigint NULL COMMENT \"\",\n" + " `v2` bigint NULL COMMENT \"\",\n" + diff --git a/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticsCollectJobTest.java b/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticsCollectJobTest.java index 95f6241a15aff4..5416aaa55718c0 100644 --- a/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticsCollectJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticsCollectJobTest.java @@ -162,10 +162,12 @@ public void testAnalyzeALLDB() { Assert.assertEquals(5, jobs.size()); Assert.assertTrue(jobs.get(0) instanceof FullStatisticsCollectJob); FullStatisticsCollectJob fullStatisticsCollectJob = (FullStatisticsCollectJob) jobs.get(0); - Assert.assertEquals("[v1, v2, v3, v4, v5]", fullStatisticsCollectJob.getColumns().toString()); + Assert.assertTrue("[pk, v1, v2][v1, v2, v3, v4, v5][v4, v5, v6][v1, v2, v3, v4, v5]".contains( + fullStatisticsCollectJob.getColumns().toString())); Assert.assertTrue(jobs.get(1) instanceof FullStatisticsCollectJob); fullStatisticsCollectJob = (FullStatisticsCollectJob) jobs.get(1); - Assert.assertEquals("[v4, v5, v6]", fullStatisticsCollectJob.getColumns().toString()); + Assert.assertTrue("[pk, v1, v2][v1, v2, v3, v4, v5][v4, v5, v6][v1, v2, v3, v4, v5]".contains( + fullStatisticsCollectJob.getColumns().toString())); } @Test