Skip to content

Commit

Permalink
[Enhancement] Support materialized_view_rewrite_mode for mv rewrite (#…
Browse files Browse the repository at this point in the history
…26997)


Signed-off-by: Kevin Li <ming.moriarty@gmail.com>
  • Loading branch information
LiShuMing authored Aug 11, 2023
1 parent 511b5f5 commit 4a4fe7a
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 15 deletions.
47 changes: 46 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.thrift.TQueryOptions;
import com.starrocks.thrift.TSpillMode;
import com.starrocks.thrift.TTabletInternalParallelMode;
import org.apache.commons.lang3.EnumUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
Expand Down Expand Up @@ -363,6 +364,26 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE = "enable_materialized_view_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE = "enable_materialized_view_union_rewrite";

public enum MaterializedViewRewriteMode {
DISABLE, // disable materialized view rewrite
DEFAULT, // default, choose the materialized view or not by cost optimizer
DEFAULT_OR_ERROR, // default, but throw exception if no materialized view is not chosen.
FORCE, // force to choose the materialized view if possible, otherwise use the original query
FORCE_OR_ERROR; // force to choose the materialized view if possible, throw exception if no materialized view is
// not chosen.

public static String MODE_DISABLE = DISABLE.toString();
public static String MODE_DEFAULT = DEFAULT.toString();
public static String MODE_DEFAULT_OR_ERROR = DEFAULT_OR_ERROR.toString();
public static String MODE_FORCE = FORCE.toString();
public static String MODE_FORCE_OR_ERROR = FORCE_OR_ERROR.toString();

public static MaterializedViewRewriteMode parse(String str) {
return EnumUtils.getEnumIgnoreCase(MaterializedViewRewriteMode.class, str);
}
}
public static final String MATERIALIZED_VIEW_REWRITE_MODE = "materialized_view_rewrite_mode";

public static final String ENABLE_SYNC_MATERIALIZED_VIEW_REWRITE = "enable_sync_materialized_view_rewrite";
public static final String ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE =
"enable_rule_based_materialized_view_rewrite";
Expand Down Expand Up @@ -1080,6 +1101,9 @@ public void setEnableParallelMerge(boolean enableParallelMerge) {
@VarAttr(name = ENABLE_MATERIALIZED_VIEW_VIEW_DELTA_REWRITE)
private boolean enableMaterializedViewViewDeltaRewrite = true;

@VarAttr(name = MATERIALIZED_VIEW_REWRITE_MODE)
private String materializedViewRewriteMode = MaterializedViewRewriteMode.MODE_DEFAULT;

// Whether to enable view delta compensation for single table,
// - try to rewrite single table query into candidate view-delta mvs if enabled which will choose
// plan by cost.
Expand Down Expand Up @@ -1848,6 +1872,28 @@ public boolean isEnableReplicationJoin() {
return false;
}

public String getMaterializedViewRewriteMode() {
return materializedViewRewriteMode;
}

public void setMaterializedViewRewriteMode(String materializedViewRewriteMode) {
this.materializedViewRewriteMode = materializedViewRewriteMode;
}

public boolean isDisableMaterializedViewRewrite() {
return materializedViewRewriteMode.equalsIgnoreCase(MaterializedViewRewriteMode.MODE_DISABLE);
}

public boolean isEnableMaterializedViewForceRewrite() {
return materializedViewRewriteMode.equalsIgnoreCase(MaterializedViewRewriteMode.MODE_FORCE) ||
materializedViewRewriteMode.equalsIgnoreCase(MaterializedViewRewriteMode.MODE_FORCE_OR_ERROR);
}

public boolean isEnableMaterializedViewRewriteOrError() {
return materializedViewRewriteMode.equalsIgnoreCase(MaterializedViewRewriteMode.MODE_FORCE_OR_ERROR) ||
materializedViewRewriteMode.equalsIgnoreCase(MaterializedViewRewriteMode.MODE_DEFAULT_OR_ERROR);
}

public boolean isSetUseNthExecPlan() {
return useNthExecPlan > 0;
}
Expand All @@ -1861,7 +1907,6 @@ public void setUseNthExecPlan(int nthExecPlan) {
}

public void setEnableReplicationJoin(boolean enableReplicationJoin) {

}

public boolean isUseCorrelatedJoinEstimate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,12 @@ public Map<String, LogTracer> getTracers() {
*/
public static LogTracer getLogTracer(String name) {
ConnectContext ctx = ConnectContext.get();
if (ctx == null || ctx.getPlannerProfile() == null ||
ctx.getExplainLevel() != StatementBase.ExplainLevel.REWRITE) {
if (ctx == null || ctx.getPlannerProfile() == null) {
return null;
}

if (ctx.getExplainLevel() != StatementBase.ExplainLevel.REWRITE &&
!ctx.getSessionVariable().isEnableMaterializedViewRewriteOrError()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ private OptExpression optimizeByCost(ConnectContext connectContext,
}

OptExpression result;
if (!connectContext.getSessionVariable().isSetUseNthExecPlan()) {
result = extractBestPlan(requiredProperty, memo.getRootGroup());
} else {
if (connectContext.getSessionVariable().isSetUseNthExecPlan()) {
// extract the nth execution plan
int nthExecPlan = connectContext.getSessionVariable().getUseNthExecPlan();
result = EnumeratePlan.extractNthPlan(requiredProperty, memo.getRootGroup(), nthExecPlan);
} else {
result = extractBestPlan(requiredProperty, memo.getRootGroup());
}
OptimizerTraceUtil.logOptExpression(connectContext, "after extract best plan:\n%s", result);

Expand Down Expand Up @@ -424,6 +424,9 @@ private OptExpression logicalRuleRewrite(ConnectContext connectContext,
private boolean isEnableSingleTableMVRewrite(TaskContext rootTaskContext,
SessionVariable sessionVariable,
OptExpression queryPlan) {
if (sessionVariable.isDisableMaterializedViewRewrite()) {
return false;
}
// if disable single mv rewrite, return false.
if (optimizerConfig.isRuleSetTypeDisable(RuleSetType.SINGLE_TABLE_MV_REWRITE)) {
return false;
Expand Down Expand Up @@ -571,6 +574,10 @@ void memoOptimize(ConnectContext connectContext, Memo memo, TaskContext rootTask
}

private boolean isEnableMultiTableRewrite(ConnectContext connectContext, OptExpression queryPlan) {
if (connectContext.getSessionVariable().isDisableMaterializedViewRewrite()) {
return false;
}

if (context.getCandidateMvs().isEmpty()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ public CostEstimate visitPhysicalOlapScan(PhysicalOlapScanOperator node, Express
Statistics statistics = context.getStatistics();
Preconditions.checkNotNull(statistics);
if (node.getTable().isMaterializedView()) {
// If materialized view force rewrite is enabled, hack the materialized view
// as zero so can be chosen by the optimizer.
ConnectContext ctx = ConnectContext.get();
SessionVariable sessionVariable = ctx.getSessionVariable();
if (sessionVariable.isEnableMaterializedViewForceRewrite()) {
return CostEstimate.zero();
}

Statistics groupStatistics = context.getGroupStatistics();
Statistics mvStatistics = context.getStatistics();
// only adjust cost for mv scan operator when group statistics is unknown and mv group expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,4 @@ public static boolean containComplexExpresses(MaterializedIndexMeta mvMeta) {
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public List<OptExpression> transform(OptExpression queryExpression, OptimizerCon
int currentRootGroupId = queryExpression.getGroupExpression().getGroup().getId();
mvContext.addMatchedGroup(currentRootGroupId);
}

results.add(candidate);
mvContext.updateMVUsedCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,37 @@ public static MVRewriteValidator getInstance() {
}

public void validateMV(OptExpression physicalPlan) {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return;
}

PlannerProfile.LogTracer tracer = PlannerProfile.getLogTracer("Summary");
if (tracer == null) {
return;
}
List<String> mvNames = collectMaterializedViewNames(physicalPlan);
if (mvNames.isEmpty()) {
// Check whether plan has been rewritten success by rule.
Map<String, PlannerProfile.LogTracer> tracers = ConnectContext.get().getPlannerProfile().getTracers();
Map<String, PlannerProfile.LogTracer> tracers = connectContext.getPlannerProfile().getTracers();
List<String> tracerNames = Lists.newArrayList();
for (Map.Entry<String, PlannerProfile.LogTracer> e : tracers.entrySet()) {
if (e.getValue().getLogs().stream().anyMatch(x -> x.contains(MaterializedViewRewriter.REWRITE_SUCCESS))) {
tracerNames.add(e.getKey().replace("REWRITE ", ""));
}
}

if (connectContext.getSessionVariable().isEnableMaterializedViewRewriteOrError()) {
if (tracerNames.isEmpty()) {
throw new IllegalArgumentException("no executable plan with materialized view for this sql in " +
connectContext.getSessionVariable().getMaterializedViewRewriteMode() + " mode.");
} else {
throw new IllegalArgumentException("no executable plan with materialized view for this sql in " +
connectContext.getSessionVariable().getMaterializedViewRewriteMode() + " mode because of" +
"cost.");
}
}

if (tracerNames.isEmpty()) {
tracer.log("Query cannot be rewritten, please check the trace logs or " +
"`set enable_mv_optimizer_trace_log=on` to find more infos.");
Expand All @@ -56,6 +73,16 @@ public void validateMV(OptExpression physicalPlan) {
+ ", but are not chosen as the best plan by cost.");
}
} else {
// If final result contains materialized views, ho
if (connectContext.getSessionVariable().isEnableMaterializedViewRewriteOrError()) {
Map<String, PlannerProfile.LogTracer> tracers = connectContext.getPlannerProfile().getTracers();
if (tracers.entrySet().stream().noneMatch(e -> e.getValue().getLogs().stream()
.anyMatch(x -> x.contains(MaterializedViewRewriter.REWRITE_SUCCESS)))) {
throw new IllegalArgumentException("no executable plan with materialized view for this sql in " +
connectContext.getSessionVariable().getMaterializedViewRewriteMode() + " mode.");
}
}

tracer.log("Query has already been successfully rewritten by: " + Joiner.on(",").join(mvNames) + ".");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,6 @@ public void testAggregateMaterializationAggregateFuncs19() {
+ "from emps group by empid + 10");
}


@Test
public void testJoinAggregateMaterializationNoAggregateFuncs1() {
// If agg push down is open, cannot rewrite.
Expand Down Expand Up @@ -1659,7 +1658,6 @@ public void testAggregateMaterializationOnCountDistinctQuery4() {

@Test
public void testInnerJoinViewDelta() {
connectContext.getSessionVariable().setOptimizerExecuteTimeout(300000000);
String mv = "SELECT" +
" `l`.`LO_ORDERKEY` as col1, `l`.`LO_ORDERDATE`, `l`.`LO_LINENUMBER`, `l`.`LO_CUSTKEY`, `l`.`LO_PARTKEY`," +
" `l`.`LO_SUPPKEY`, `l`.`LO_ORDERPRIORITY`, `l`.`LO_SHIPPRIORITY`, `l`.`LO_QUANTITY`," +
Expand Down Expand Up @@ -2615,6 +2613,7 @@ public void testJoinTypeMismatchRewriteForViewDelta() throws Exception {
testRewriteFail(mv, query);
}
}

@Test
public void testRewriteAvg1() {
String mv1 = "select user_id, avg(tag_id) from user_tags group by user_id;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.qe.SessionVariable;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.utframe.UtFrameUtils;
Expand Down Expand Up @@ -54,9 +55,10 @@ public void testMV_JoinAgg1() throws Exception {
public void testMV_JoinAgg2() throws Exception {
FeConstants.isReplayFromQueryDump = true;
String jsonStr = getDumpInfoFromFile("query_dump/materialized-view/join_agg2");
connectContext.getSessionVariable().setMaterializedViewRewriteMode(
SessionVariable.MaterializedViewRewriteMode.MODE_FORCE_OR_ERROR);
Pair<QueryDumpInfo, String> replayPair = getCostPlanFragment(jsonStr, connectContext.getSessionVariable());
// TODO: If table and mv have stats, query cannot be rewritten for now.
Assert.assertFalse(replayPair.second.contains("table: mv1, rollup: mv1"));
Assert.assertTrue(replayPair.second.contains("table: mv1, rollup: mv1"));
FeConstants.isReplayFromQueryDump = false;
}

Expand All @@ -74,18 +76,18 @@ public void testMV_JoinAgg3() throws Exception {
@Test
public void testMV_JoinAgg4() throws Exception {
FeConstants.isReplayFromQueryDump = true;
// TODO: If table and mv have stats, query cannot be rewritten for now.
connectContext.getSessionVariable().setMaterializedViewRewriteMode(
SessionVariable.MaterializedViewRewriteMode.MODE_FORCE_OR_ERROR);
Pair<QueryDumpInfo, String> replayPair =
getPlanFragment(getDumpInfoFromFile("query_dump/materialized-view/join_agg4"),
connectContext.getSessionVariable(), TExplainLevel.NORMAL);
Assert.assertFalse(replayPair.second.contains("line_order_flat_mv"));
Assert.assertTrue(replayPair.second.contains("line_order_flat_mv"));
FeConstants.isReplayFromQueryDump = false;
}

@Test
public void testMV_MVOnMV1() throws Exception {
FeConstants.isReplayFromQueryDump = true;
// TODO: If table and mv have stats, query cannot be rewritten for now.
Pair<QueryDumpInfo, String> replayPair =
getPlanFragment(getDumpInfoFromFile("query_dump/materialized-view/mv_on_mv1"),
null, TExplainLevel.NORMAL);
Expand Down
Loading

0 comments on commit 4a4fe7a

Please sign in to comment.