Skip to content

Commit

Permalink
[BugFix] Complex shuffle join with aggregate bug (#16834)
Browse files Browse the repository at this point in the history
Signed-off-by: Seaven <seaven_7@qq.com>
  • Loading branch information
Seaven authored Jan 31, 2023
1 parent c47e62a commit e426bc5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ private PhysicalPropertySet visitPhysicalJoin(PhysicalJoinOperator node, Express
} else if ((leftDistributionDesc.isShuffle() || leftDistributionDesc.isShuffleEnforce()) &&
(rightDistributionDesc.isShuffle()) || rightDistributionDesc.isShuffleEnforce()) {
// shuffle join
return computeHashJoinDistributionPropertyInfo(node,
computeShuffleJoinOutputProperty(node.getJoinType(), leftOnPredicateColumns, rightOnPredicateColumns),
leftOnPredicateColumns,
rightOnPredicateColumns, context);
PhysicalPropertySet outputProperty = computeShuffleJoinOutputProperty(node.getJoinType(),
leftDistributionDesc.getColumns(), rightDistributionDesc.getColumns());
return computeHashJoinDistributionPropertyInfo(node, outputProperty,
leftOnPredicateColumns, rightOnPredicateColumns, context);
} else {
LOG.error("Children output property distribution error.left child property: {}, " +
"right child property: {}, join node: {}",
Expand All @@ -247,16 +247,16 @@ private PhysicalPropertySet visitPhysicalJoin(PhysicalJoinOperator node, Express
}

private PhysicalPropertySet computeShuffleJoinOutputProperty(JoinOperator joinType,
List<Integer> leftOnPredicateColumns,
List<Integer> rightOnPredicateColumns) {
List<Integer> leftShuffleColumns,
List<Integer> rightShuffleColumns) {
Optional<HashDistributionDesc> requiredShuffleDesc = getRequiredShuffleDesc();
if (!requiredShuffleDesc.isPresent()) {
return PhysicalPropertySet.EMPTY;
}

// Get required properties for children.
List<PhysicalPropertySet> requiredProperties =
computeShuffleJoinRequiredProperties(requirements, leftOnPredicateColumns, rightOnPredicateColumns);
computeShuffleJoinRequiredProperties(requirements, leftShuffleColumns, rightShuffleColumns);
Preconditions.checkState(requiredProperties.size() == 2);

// when it's a right join, we should use right input cols to derive the output property
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/JoinTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2693,4 +2693,37 @@ public void testSmallestColInJoin() throws Exception {
" 0:OlapScanNode\n" +
" TABLE: t0");
}

@Test
public void testShuffleAgg() throws Exception {
String sql = "select j0.* \n" +
" from t6 j0 join[shuffle] t6 j1 on j0.v2 = j1.v2 and j0.v3 = j1.v3\n" +
" join[shuffle] t6 j2 on j0.v2 = j2.v2 and j0.v3 = j2.v3 and j0.v4 = j2.v4\n" +
" join[shuffle] (select v4,v2,v3 from t6 group by v4,v2,v3) j4 " +
" on j0.v2 =j4.v2 and j0.v3=j4.v3 and j0.v4 = j4.v4;\n" +
"\n";

connectContext.getSessionVariable().setNewPlanerAggStage(2);
String plan = getFragmentPlan(sql);
connectContext.getSessionVariable().setNewPlanerAggStage(0);

assertContains(plan, " 15:HASH JOIN\n" +
" | join op: INNER JOIN (BUCKET_SHUFFLE(S))\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 2: v2 = 14: v2\n" +
" | equal join conjunct: 3: v3 = 15: v3\n" +
" | equal join conjunct: 4: v4 = 16: v4\n" +
" | \n" +
" |----14:EXCHANGE\n" +
" | \n" +
" 9:Project");
assertContains(plan, " PARTITION: HASH_PARTITIONED: 14: v2, 15: v3, 16: v4\n" +
"\n" +
" STREAM DATA SINK\n" +
" EXCHANGE ID: 14\n" +
" HASH_PARTITIONED: 14: v2, 15: v3\n" +
"\n" +
" 13:AGGREGATE (merge finalize)\n" +
" | group by: 16: v4, 14: v2, 15: v3");
}
}
14 changes: 14 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ public static void beforeClass() throws Exception {
"\"storage_format\" = \"DEFAULT\"\n" +
");");

starRocksAssert.withTable("CREATE TABLE `t6` (\n" +
" `v1` bigint NULL COMMENT \"\",\n" +
" `v2` bigint NULL COMMENT \"\",\n" +
" `v3` bigint NULL COMMENT \"\",\n" +
" `v4` bigint NULL\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`v1`, `v2`, v3)\n" +
"DISTRIBUTED BY HASH(`v1`) BUCKETS 3\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\",\n" +
"\"in_memory\" = \"false\",\n" +
"\"storage_format\" = \"DEFAULT\"\n" +
");");

starRocksAssert.withTable("CREATE TABLE `colocate_t0` (\n" +
" `v1` bigint NULL COMMENT \"\",\n" +
" `v2` bigint NULL COMMENT \"\",\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ public void testAnalyzeALLDB() {
Assert.assertEquals(5, jobs.size());
Assert.assertTrue(jobs.get(0) instanceof FullStatisticsCollectJob);
FullStatisticsCollectJob fullStatisticsCollectJob = (FullStatisticsCollectJob) jobs.get(0);
Assert.assertEquals("[v1, v2, v3, v4, v5]", fullStatisticsCollectJob.getColumns().toString());
Assert.assertTrue("[pk, v1, v2][v1, v2, v3, v4, v5][v4, v5, v6][v1, v2, v3, v4, v5]".contains(
fullStatisticsCollectJob.getColumns().toString()));
Assert.assertTrue(jobs.get(1) instanceof FullStatisticsCollectJob);
fullStatisticsCollectJob = (FullStatisticsCollectJob) jobs.get(1);
Assert.assertEquals("[v4, v5, v6]", fullStatisticsCollectJob.getColumns().toString());
Assert.assertTrue("[pk, v1, v2][v1, v2, v3, v4, v5][v4, v5, v6][v1, v2, v3, v4, v5]".contains(
fullStatisticsCollectJob.getColumns().toString()));
}

@Test
Expand Down

0 comments on commit e426bc5

Please sign in to comment.