Skip to content

Commit

Permalink
Merge pull request #17079 from ABingHuang/issue/17078_fix_union_rewri…
Browse files Browse the repository at this point in the history
…te_unstable_ut

[UT] fix unstable union mv rewrite ut

Signed-off-by: ABingHuang codekhuang@163.com
  • Loading branch information
ABingHuang authored Jan 31, 2023
2 parents e426bc5 + 2d0295d commit 69785d5
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

package com.starrocks.clone;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -535,6 +536,11 @@ private void initDynamicPartitionTable() {
initialize = true;
}

@VisibleForTesting
public void runOnceForTest() {
runAfterCatalogReady();
}

@Override
protected void runAfterCatalogReady() {
if (!initialize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public enum RuleType {
TF_MV_ONLY_JOIN_RULE,
TF_MV_FILTER_JOIN_RULE,
TF_MV_AGGREGATE_SCAN_RULE,
TF_MV_AGGREGATE_AGGREGATE_SCAN_RULE,
TF_MV_AGGREGATE_FILTER_SCAN_RULE,
TF_MV_AGGREGATE_JOIN_RULE,
TF_MV_AGGREGATE_FILTER_JOIN_RULE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class AggregateAggregateScanRule extends SingleTableRewriteBaseRule {
private static AggregateAggregateScanRule INSTANCE = new AggregateAggregateScanRule();

public AggregateAggregateScanRule() {
super(RuleType.TF_MV_AGGREGATE_SCAN_RULE, Pattern.create(OperatorType.LOGICAL_AGGR)
super(RuleType.TF_MV_AGGREGATE_AGGREGATE_SCAN_RULE, Pattern.create(OperatorType.LOGICAL_AGGR)
.addChildren(Pattern.create(OperatorType.LOGICAL_AGGR).addChildren(Pattern.create(OperatorType.PATTERN_SCAN))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.TimeoutException;
import com.starrocks.pseudocluster.PseudoCluster;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.Task;
Expand All @@ -44,7 +43,7 @@ public class MvRewriteOptimizationTest {

@BeforeClass
public static void beforeClass() throws Exception {
Config.dynamic_partition_check_interval_seconds = 1;
Config.dynamic_partition_check_interval_seconds = 10000;
Config.bdbje_heartbeat_timeout_second = 60;
Config.bdbje_replica_ack_timeout_second = 60;
Config.bdbje_lock_timeout_second = 60;
Expand Down Expand Up @@ -1334,6 +1333,7 @@ public void testUnionRewrite() throws Exception {
cluster.runSql("test", "insert into test_base_part values(1000, 1, 2, 3)");
cluster.runSql("test", "insert into test_base_part values(2000, 1, 2, 3)");
cluster.runSql("test", "insert into test_base_part values(2500, 1, 2, 3)");

createAndRefreshMv("test", "ttl_union_mv_1", "CREATE MATERIALIZED VIEW `ttl_union_mv_1`\n" +
"COMMENT \"MATERIALIZED_VIEW\"\n" +
"PARTITION BY (`c3`)\n" +
Expand All @@ -1347,9 +1347,12 @@ public void testUnionRewrite() throws Exception {
"AS SELECT `test_base_part`.`c1`, `test_base_part`.`c3`, sum(`test_base_part`.`c2`) AS `c2`\n" +
"FROM `test_base_part`\n" +
"GROUP BY `test_base_part`.`c1`, `test_base_part`.`c3`;");

MaterializedView ttlMv1 = getMv("test", "ttl_union_mv_1");
Assert.assertNotNull(ttlMv1);
waitTtl(ttlMv1, 3, 200);
GlobalStateMgr.getCurrentState().getDynamicPartitionScheduler().runOnceForTest();
Assert.assertEquals(3, ttlMv1.getPartitions().size());

String query4 = "select c3, sum(c2) from test_base_part group by c3";
String plan4 = getFragmentPlan(query4);
PlanTestBase.assertContains(plan4, "ttl_union_mv_1", "UNION", "test_base_part");
Expand Down Expand Up @@ -1450,20 +1453,6 @@ public void testNestedMv() throws Exception {
starRocksAssert.dropTable("nest_base_table_1");
}

private void waitTtl(MaterializedView mv, int number, int maxRound) throws InterruptedException, TimeoutException {
int round = 0;
while (true) {
if (mv.getPartitions().size() == number) {
break;
}
if (round >= maxRound) {
throw new TimeoutException("wait ttl timeout");
}
Thread.sleep(1000);
round++;
}
}

@Test
public void testPartialPartition() throws Exception {
starRocksAssert.getCtx().getSessionVariable().setEnableMaterializedViewUnionRewrite(true);
Expand Down Expand Up @@ -1670,7 +1659,8 @@ public void testPartialPartition() throws Exception {
" )\n" +
" AS SELECT k1, sum(v1) as sum_v1 FROM ttl_base_table group by k1;");
MaterializedView ttlMv2 = getMv("test", "ttl_mv_2");
waitTtl(ttlMv2, 4, 100);
GlobalStateMgr.getCurrentState().getDynamicPartitionScheduler().runOnceForTest();
Assert.assertEquals(4, ttlMv2.getPartitions().size());

String query16 = "select k1, sum(v1) FROM ttl_base_table where k1=3 group by k1";
String plan16 = getFragmentPlan(query16);
Expand Down

0 comments on commit 69785d5

Please sign in to comment.