Skip to content

Commit

Permalink
[fix](nereids) Fix data wrong using mv rewrite and ignore case when g…
Browse files Browse the repository at this point in the history
…etting mv related partition table (apache#28699)

1. Fix data wrong using mv rewrite
2. Ignore case when getting mv related partition table
3. Enable infer expression column name without alias when create mv
  • Loading branch information
seawinde authored Dec 20, 2023
1 parent 2b2d3d0 commit 9a5ec43
Show file tree
Hide file tree
Showing 21 changed files with 309 additions and 173 deletions.
6 changes: 2 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -199,13 +198,12 @@ public Set<String> getExcludedTriggerTables() {
return Sets.newHashSet(split);
}

// this should use the same connectContext with query, to use the same session variable
public MTMVCache getOrGenerateCache(ConnectContext parent) throws AnalysisException {
public MTMVCache getOrGenerateCache() throws AnalysisException {
if (cache == null) {
writeMvLock();
try {
if (cache == null) {
this.cache = MTMVCache.from(this, parent);
this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this));
}
} finally {
writeMvUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {

public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
// TODO: connect context set current db when create mv by use database
// this will be removed in the future when support join derivation
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES, ELIMINATE_OUTER_JOIN");
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ public <OUTPUT_TYPE extends Plan> PatternMatcher<INPUT_TYPE, OUTPUT_TYPE> thenAp
return new PatternMatcher<>(pattern, defaultPromise, matchedAction);
}

/**
* Apply rule to return multi result, catch exception to make sure no influence on other rule
*/
public <OUTPUT_TYPE extends Plan> PatternMatcher<INPUT_TYPE, OUTPUT_TYPE> thenApplyMultiNoThrow(
MatchedMultiAction<INPUT_TYPE, OUTPUT_TYPE> matchedMultiAction) {
MatchedMultiAction<INPUT_TYPE, OUTPUT_TYPE> adaptMatchedMultiAction = ctx -> {
try {
return matchedMultiAction.apply(ctx);
} catch (Exception ex) {
LOG.warn("nereids apply rule failed, because {}", ex.getMessage(), ex);
return null;
}
};
return new PatternMatcher<>(pattern, defaultPromise, adaptMatchedMultiAction);
}

public Pattern<INPUT_TYPE> getPattern() {
return pattern;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
List<MaterializationContext> materializationContexts = cascadesContext.getMaterializationContexts();
List<Plan> rewriteResults = new ArrayList<>();
if (materializationContexts.isEmpty()) {
logger.info(currentClassName + " materializationContexts is empty so return");
logger.debug(currentClassName + " materializationContexts is empty so return");
return rewriteResults;
}

List<StructInfo> queryStructInfos = extractStructInfo(queryPlan, cascadesContext);
// TODO Just Check query queryPlan firstly, support multi later.
StructInfo queryStructInfo = queryStructInfos.get(0);
if (!checkPattern(queryStructInfo)) {
logger.info(currentClassName + " queryStructInfo is not valid so return");
logger.debug(currentClassName + " queryStructInfo is not valid so return");
return rewriteResults;
}

Expand All @@ -101,42 +101,42 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
if (queryPlan.getGroupExpression().isPresent()
&& materializationContext.alreadyRewrite(
queryPlan.getGroupExpression().get().getOwnerGroup().getGroupId())) {
logger.info(currentClassName + " this group is already rewritten so skip");
logger.debug(currentClassName + " this group is already rewritten so skip");
continue;
}
MTMV mtmv = materializationContext.getMTMV();
MTMVCache mtmvCache = getCacheFromMTMV(mtmv, cascadesContext);
MTMVCache mtmvCache = getCacheFromMTMV(mtmv);
if (mtmvCache == null) {
logger.info(currentClassName + " mv cache is null so return");
logger.warn(currentClassName + " mv cache is null so return");
return rewriteResults;
}
List<StructInfo> viewStructInfos = extractStructInfo(mtmvCache.getLogicalPlan(), cascadesContext);
if (viewStructInfos.size() > 1) {
// view struct info should only have one
logger.info(currentClassName + " the num of view struct info is more then one so return");
logger.warn(currentClassName + " the num of view struct info is more then one so return");
return rewriteResults;
}
StructInfo viewStructInfo = viewStructInfos.get(0);
if (!checkPattern(viewStructInfo)) {
logger.info(currentClassName + " viewStructInfo is not valid so return");
logger.debug(currentClassName + " viewStructInfo is not valid so return");
continue;
}
MatchMode matchMode = decideMatchMode(queryStructInfo.getRelations(), viewStructInfo.getRelations());
if (MatchMode.COMPLETE != matchMode) {
logger.info(currentClassName + " match mode is not complete so return");
logger.debug(currentClassName + " match mode is not complete so return");
continue;
}
List<RelationMapping> queryToViewTableMappings =
RelationMapping.generate(queryStructInfo.getRelations(), viewStructInfo.getRelations());
// if any relation in query and view can not map, bail out.
if (queryToViewTableMappings == null) {
logger.info(currentClassName + " query to view table mapping null so return");
logger.warn(currentClassName + " query to view table mapping null so return");
return rewriteResults;
}
for (RelationMapping queryToViewTableMapping : queryToViewTableMappings) {
SlotMapping queryToViewSlotMapping = SlotMapping.generate(queryToViewTableMapping);
if (queryToViewSlotMapping == null) {
logger.info(currentClassName + " query to view slot mapping null so continue");
logger.warn(currentClassName + " query to view slot mapping null so continue");
continue;
}
LogicalCompatibilityContext compatibilityContext =
Expand All @@ -145,7 +145,7 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
List<Expression> pulledUpExpressions = StructInfo.isGraphLogicalEquals(queryStructInfo, viewStructInfo,
compatibilityContext);
if (pulledUpExpressions == null) {
logger.info(currentClassName + " graph logical is not equals so continue");
logger.debug(currentClassName + " graph logical is not equals so continue");
continue;
}
// set pulled up expression to queryStructInfo predicates and update related predicates
Expand All @@ -156,13 +156,13 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
queryToViewSlotMapping);
// Can not compensate, bail out
if (compensatePredicates.isEmpty()) {
logger.info(currentClassName + " predicate compensate fail so continue");
logger.debug(currentClassName + " predicate compensate fail so continue");
continue;
}
Plan rewritedPlan;
Plan rewrittenPlan;
Plan mvScan = materializationContext.getMvScanPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewritedPlan = mvScan;
rewrittenPlan = mvScan;
} else {
// Try to rewrite compensate predicates by using mv scan
List<Expression> rewriteCompensatePredicates = rewriteExpression(
Expand All @@ -172,39 +172,52 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
queryToViewSlotMapping,
true);
if (rewriteCompensatePredicates.isEmpty()) {
logger.info(currentClassName + " compensate predicate rewrite by view fail so continue");
logger.debug(currentClassName + " compensate predicate rewrite by view fail so continue");
continue;
}
rewritedPlan = new LogicalFilter<>(Sets.newHashSet(rewriteCompensatePredicates), mvScan);
rewrittenPlan = new LogicalFilter<>(Sets.newHashSet(rewriteCompensatePredicates), mvScan);
}
// Rewrite query by view
rewritedPlan = rewriteQueryByView(matchMode,
rewrittenPlan = rewriteQueryByView(matchMode,
queryStructInfo,
viewStructInfo,
queryToViewSlotMapping,
rewritedPlan,
rewrittenPlan,
materializationContext);
if (rewritedPlan == null) {
logger.info(currentClassName + " rewrite query by view fail so continue");
if (rewrittenPlan == null) {
logger.debug(currentClassName + " rewrite query by view fail so continue");
continue;
}
if (!checkPartitionIsValid(queryStructInfo, materializationContext, cascadesContext)) {
logger.info(currentClassName + " check partition validation fail so continue");
logger.debug(currentClassName + " check partition validation fail so continue");
continue;
}
if (!checkOutput(queryPlan, rewrittenPlan)) {
logger.debug(currentClassName + " check output validation fail so continue");
continue;
}
// run rbo job on mv rewritten plan
CascadesContext rewrittenPlanContext =
CascadesContext.initContext(cascadesContext.getStatementContext(), rewritedPlan,
CascadesContext.initContext(cascadesContext.getStatementContext(), rewrittenPlan,
cascadesContext.getCurrentJobContext().getRequiredProperties());
Rewriter.getWholeTreeRewriter(cascadesContext).execute();
rewritedPlan = rewrittenPlanContext.getRewritePlan();
logger.info(currentClassName + "rewrite by materialized view success");
rewriteResults.add(rewritedPlan);
rewrittenPlan = rewrittenPlanContext.getRewritePlan();
logger.debug(currentClassName + "rewrite by materialized view success");
rewriteResults.add(rewrittenPlan);
}
}
return rewriteResults;
}

protected boolean checkOutput(Plan sourcePlan, Plan rewrittenPlan) {
if (sourcePlan.getGroupExpression().isPresent() && !rewrittenPlan.getLogicalProperties().equals(
sourcePlan.getGroupExpression().get().getOwnerGroup().getLogicalProperties())) {
logger.error("rewrittenPlan output logical properties is not same with target group");
return false;
}
return true;
}

/**
* Partition will be pruned in query then add the record the partitions to select partitions on
* catalog relation.
Expand Down Expand Up @@ -276,10 +289,10 @@ protected boolean checkPartitionIsValid(
&& relatedTalbeValidSet.containsAll(relatedTableSelectedPartitionToCheck);
}

private MTMVCache getCacheFromMTMV(MTMV mtmv, CascadesContext cascadesContext) {
private MTMVCache getCacheFromMTMV(MTMV mtmv) {
MTMVCache cache;
try {
cache = mtmv.getOrGenerateCache(cascadesContext.getConnectContext());
cache = mtmv.getOrGenerateCache();
} catch (AnalysisException analysisException) {
logger.warn(this.getClass().getSimpleName() + " get mtmv cache analysisException", analysisException);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public MaterializationContext(MTMV mtmv,

MTMVCache mtmvCache = null;
try {
mtmvCache = mtmv.getOrGenerateCache(cascadesContext.getConnectContext());
mtmvCache = mtmv.getOrGenerateCache();
} catch (AnalysisException e) {
LOG.warn("MaterializationContext init mv cache generate fail", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class MaterializedViewAggregateRule extends AbstractMaterializedViewAggre
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
logicalAggregate(any()).thenApplyMulti(ctx -> {
logicalAggregate(any()).thenApplyMultiNoThrow(ctx -> {
LogicalAggregate<Plan> root = ctx.root;
return rewrite(root, ctx.cascadesContext);
}).toRule(RuleType.MATERIALIZED_VIEW_ONLY_AGGREGATE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MaterializedViewProjectAggregateRule extends AbstractMaterializedVi
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
logicalProject(logicalAggregate(any())).thenApplyMulti(ctx -> {
logicalProject(logicalAggregate(any())).thenApplyMultiNoThrow(ctx -> {
LogicalProject<LogicalAggregate<Plan>> root = ctx.root;
return rewrite(root, ctx.cascadesContext);
}).toRule(RuleType.MATERIALIZED_VIEW_PROJECT_AGGREGATE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class MaterializedViewProjectJoinRule extends AbstractMaterializedViewJoi
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
logicalProject(logicalJoin(any(), any())).thenApplyMulti(ctx -> {
logicalProject(logicalJoin(any(), any())).thenApplyMultiNoThrow(ctx -> {
LogicalProject<LogicalJoin<Plan, Plan>> root = ctx.root;
return rewrite(root, ctx.cascadesContext);
}).toRule(RuleType.MATERIALIZED_VIEW_PROJECT_JOIN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static Optional<RelatedTableInfo> getRelatedTableInfo(String column, Plan
Slot columnExpr = null;
// get column slot
for (Slot outputSlot : outputExpressions) {
if (outputSlot.getName().equals(column)) {
if (outputSlot.getName().equalsIgnoreCase(column)) {
columnExpr = outputSlot;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@

import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -34,7 +35,7 @@
public class Predicates {

// Predicates that can be pulled up
private final List<Expression> pulledUpPredicates = new ArrayList<>();
private final Set<Expression> pulledUpPredicates = new HashSet<>();

private Predicates() {
}
Expand All @@ -49,7 +50,7 @@ public static Predicates of(List<? extends Expression> pulledUpPredicates) {
return predicates;
}

public List<? extends Expression> getPulledUpPredicates() {
public Set<? extends Expression> getPulledUpPredicates() {
return pulledUpPredicates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Split the expression to equal, range and residual predicate.
Expand All @@ -39,27 +38,26 @@
*/
public class PredicatesSplitter {

private final List<Expression> equalPredicates = new ArrayList<>();
private final List<Expression> rangePredicates = new ArrayList<>();
private final List<Expression> residualPredicates = new ArrayList<>();
private final Set<Expression> equalPredicates = new HashSet<>();
private final Set<Expression> rangePredicates = new HashSet<>();
private final Set<Expression> residualPredicates = new HashSet<>();
private final List<Expression> conjunctExpressions;

private final PredicateExtract instance = new PredicateExtract();

public PredicatesSplitter(Expression target) {
this.conjunctExpressions = ExpressionUtils.extractConjunction(target);
PredicateExtract instance = new PredicateExtract();
for (Expression expression : conjunctExpressions) {
expression.accept(instance, expression);
expression.accept(instance, null);
}
}

/**
* PredicateExtract
* extract to equal, range, residual predicate set
*/
public class PredicateExtract extends DefaultExpressionVisitor<Void, Expression> {
public class PredicateExtract extends DefaultExpressionVisitor<Void, Void> {

@Override
public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Expression sourceExpression) {
public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Void context) {
Expression leftArg = comparisonPredicate.getArgument(0);
Expression rightArg = comparisonPredicate.getArgument(1);
boolean leftArgOnlyContainsColumnRef = containOnlyColumnRef(leftArg, true);
Expand All @@ -81,12 +79,9 @@ public Void visitComparisonPredicate(ComparisonPredicate comparisonPredicate, Ex
}

@Override
public Void visitCompoundPredicate(CompoundPredicate compoundPredicate, Expression context) {
if (compoundPredicate instanceof Or) {
residualPredicates.add(compoundPredicate);
return null;
}
return super.visitCompoundPredicate(compoundPredicate, context);
public Void visit(Expression expr, Void context) {
residualPredicates.add(expr);
return null;
}
}

Expand All @@ -98,7 +93,7 @@ public Predicates.SplitPredicate getSplitPredicate() {
}

private static boolean containOnlyColumnRef(Expression expression, boolean allowCast) {
if (expression instanceof SlotReference && ((SlotReference) expression).isColumnFromTable()) {
if (expression instanceof SlotReference && expression.isColumnFromTable()) {
return true;
}
if (allowCast && expression instanceof Cast) {
Expand Down
Loading

0 comments on commit 9a5ec43

Please sign in to comment.