Skip to content

Commit

Permalink
[CALCITE-2970] Add AbstractConverter only between derived and require…
Browse files Browse the repository at this point in the history
…d traitset

Before this patch, the VolcanoPlanner couldn't distinguish traitset derived
from child operators and traitset required by parent operators.
AbstractConverters are added between all of these traitsets no matter it is
derived or required, which causes the explosion of search space. e.g.

SELECT a,b,c,max(d) FROM foo GROUP BY a,b,c;
Aggregate
 +-- TableScan

For distributed system, suppose the Aggregate operator may require the
following traitsets from TableScan with exact match:
- Singleton distribution
- Hash distribution on a
- Hash distribution on b
- Hash distribution on c
- Hash distribution on a,b
- Hash distribution on b,c
- Hash distribution on a,c
- Hash distribution on a,b,c

VolcanoPlanner would add 7*7+8 = 57 abstract converters into the RelSet, e.g.
abstractConverter between [a] and [b,c], even if the satisfying match is
allowed, e.g. distribution on [a] statisfy distribution on [a,b,c], there are
still lots of abstract converters. But we only need 8.

This patch fixes above issue by adding state to RelSubset indicating whether
the added traitset is required or derived. The traitset can be both required
and derived. Only abstract converter from derived traitset to required traitset
is added.

By default, when adding a new RelNode to RelSet, we treat its traitset as
derived, when calling changeTraits, the traitset will be treated as required.
Unfortunately, almost all the RelNodes except AbstractConverter are added
through rule transformation, when the AbstractConverter is transformed to a
enforcing operator, e.g. PhysicalSort, the planner will still treat its
traitset as derived, which will trigger the creation of AbstractConverter
between this RelSubset and remaining RelSubsets in the RelSet. To avoid this
issue, though not clean but work, enforcing operator and AbstactConverter
should override isEnforcer() method to indicate the RelNode is added due to
the desired traitset is not satisfied. The user needs to judge by his/her own
whether to mark enforcing operator.

Close apache#1860
  • Loading branch information
hsyuan committed Apr 12, 2020
1 parent ee1a9d2 commit f17367e
Show file tree
Hide file tree
Showing 17 changed files with 211 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ public boolean canConvertConvention(Convention toConvention) {

public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits,
RelTraitSet toTraits) {
return false;
return true;
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/org/apache/calcite/plan/Convention.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public interface Convention extends RelTrait {
* @param toConvention Desired convention to convert to
* @return Whether we should convert from this convention to toConvention
*/
boolean canConvertConvention(Convention toConvention);
default boolean canConvertConvention(Convention toConvention) {
return false;
}

/**
* Returns whether we should convert from this trait set to the other trait
Expand All @@ -59,8 +61,10 @@ public interface Convention extends RelTrait {
* @param toTraits Target traits
* @return Whether we should add converters
*/
boolean useAbstractConvertersForConversion(RelTraitSet fromTraits,
RelTraitSet toTraits);
default boolean useAbstractConvertersForConversion(RelTraitSet fromTraits,
RelTraitSet toTraits) {
return true;
}

/**
* Default implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public RelWriter explainTerms(RelWriter pw) {
return pw;
}

@Override public boolean isEnforcer() {
return true;
}

//~ Inner Classes ----------------------------------------------------------

/**
Expand Down
152 changes: 82 additions & 70 deletions core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.calcite.plan.volcano;

import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptListener;
import org.apache.calcite.plan.RelOptUtil;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A <code>RelSet</code> is an equivalence-set of expressions; that is, a set of
Expand Down Expand Up @@ -160,107 +162,106 @@ void obliterateRelNode(RelNode rel) {
public RelSubset add(RelNode rel) {
assert equivalentSet == null : "adding to a dead set";
final RelTraitSet traitSet = rel.getTraitSet().simplify();
final RelSubset subset = getOrCreateSubset(rel.getCluster(), traitSet);
final RelSubset subset = getOrCreateSubset(
rel.getCluster(), traitSet, rel.isEnforcer());
subset.add(rel);
return subset;
}

/**
* If the subset is required, convert derived subsets to this subset.
* Otherwise, convert this subset to required subsets in this RelSet.
* The subset can be both required and derived.
*/
private void addAbstractConverters(
VolcanoPlanner planner, RelOptCluster cluster, RelSubset subset, boolean subsetToOthers) {
// Converters from newly introduced subset to all the remaining one (vice versa), only if
// we can convert. No point adding converters if it is not possible.
for (RelSubset other : subsets) {
RelOptCluster cluster, RelSubset subset, boolean required) {
List<RelSubset> others = subsets.stream().filter(
n -> required ? n.isDerived() : n.isRequired())
.collect(Collectors.toList());

for (RelSubset other : others) {
assert other.getTraitSet().size() == subset.getTraitSet().size();
RelSubset from = subset;
RelSubset to = other;

if (required) {
from = other;
to = subset;
}

if ((other == subset)
|| (subsetToOthers
&& !subset.getConvention().useAbstractConvertersForConversion(
subset.getTraitSet(), other.getTraitSet()))
|| (!subsetToOthers
&& !other.getConvention().useAbstractConvertersForConversion(
other.getTraitSet(), subset.getTraitSet()))) {
if (from == to || !from.getConvention()
.useAbstractConvertersForConversion(
from.getTraitSet(), to.getTraitSet())) {
continue;
}

final ImmutableList<RelTrait> difference =
subset.getTraitSet().difference(other.getTraitSet());
to.getTraitSet().difference(from.getTraitSet());

boolean addAbstractConverter = true;
int numTraitNeedConvert = 0;

for (RelTrait curOtherTrait : difference) {
RelTraitDef traitDef = curOtherTrait.getTraitDef();
RelTrait curRelTrait = subset.getTraitSet().getTrait(traitDef);

if (curRelTrait == null) {
addAbstractConverter = false;
break;
}
boolean needsConverter = false;

assert curRelTrait.getTraitDef() == traitDef;

boolean canConvert = false;
boolean needConvert = false;
if (subsetToOthers) {
// We can convert from subset to other. So, add converter with subset as child and
// traitset as the other's traitset.
canConvert = traitDef.canConvert(
cluster.getPlanner(), curRelTrait, curOtherTrait, subset);
needConvert = !curRelTrait.satisfies(curOtherTrait);
} else {
// We can convert from others to subset.
canConvert = traitDef.canConvert(
cluster.getPlanner(), curOtherTrait, curRelTrait, other);
needConvert = !curOtherTrait.satisfies(curRelTrait);
}
for (RelTrait fromTrait : difference) {
RelTraitDef traitDef = fromTrait.getTraitDef();
RelTrait toTrait = to.getTraitSet().getTrait(traitDef);

if (!canConvert) {
addAbstractConverter = false;
if (toTrait == null || !traitDef.canConvert(
cluster.getPlanner(), fromTrait, toTrait, from)) {
needsConverter = false;
break;
}

if (needConvert) {
numTraitNeedConvert++;
if (!fromTrait.satisfies(toTrait)) {
needsConverter = true;
}
}

if (addAbstractConverter && numTraitNeedConvert > 0) {
if (subsetToOthers) {
final AbstractConverter converter =
new AbstractConverter(cluster, subset, null, other.getTraitSet());
planner.register(converter, other);
} else {
final AbstractConverter converter =
new AbstractConverter(cluster, other, null, subset.getTraitSet());
planner.register(converter, subset);
}
if (needsConverter) {
final AbstractConverter converter =
new AbstractConverter(cluster, from, null, to.getTraitSet());
cluster.getPlanner().register(converter, to);
}
}
}

RelSubset getOrCreateSubset(RelOptCluster cluster, RelTraitSet traits) {
return getOrCreateSubset(cluster, traits, false);
}

RelSubset getOrCreateSubset(
RelOptCluster cluster,
RelTraitSet traits) {
RelOptCluster cluster, RelTraitSet traits, boolean required) {
boolean needsConverter = false;
RelSubset subset = getSubset(traits);

if (subset == null) {
needsConverter = true;
subset = new RelSubset(cluster, this, traits);

final VolcanoPlanner planner =
(VolcanoPlanner) cluster.getPlanner();

addAbstractConverters(planner, cluster, subset, true);

// Need to first add to subset before adding the abstract converters (for others->subset)
// since otherwise during register() the planner will try to add this subset again.
// Need to first add to subset before adding the abstract
// converters (for others->subset), since otherwise during
// register() the planner will try to add this subset again.
subsets.add(subset);

addAbstractConverters(planner, cluster, subset, false);

final VolcanoPlanner planner = (VolcanoPlanner) cluster.getPlanner();
if (planner.listener != null) {
postEquivalenceEvent(planner, subset);
}
} else if ((required && !subset.isRequired())
|| (!required && !subset.isDerived())) {
needsConverter = true;
}

if (subset.getConvention() == Convention.NONE) {
needsConverter = false;
} else if (required) {
subset.setRequired();
} else {
subset.setDerived();
}

if (needsConverter) {
addAbstractConverters(cluster, subset, required);
}

return subset;
}

Expand Down Expand Up @@ -327,7 +328,8 @@ void mergeWith(
assert otherSet.equivalentSet == null;
LOGGER.trace("Merge set#{} into set#{}", otherSet.id, id);
otherSet.equivalentSet = this;
RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
RelOptCluster cluster = rel.getCluster();
RelMetadataQuery mq = cluster.getMetadataQuery();

// remove from table
boolean existed = planner.allSets.remove(otherSet);
Expand All @@ -337,10 +339,20 @@ void mergeWith(

// merge subsets
for (RelSubset otherSubset : otherSet.subsets) {
RelSubset subset =
getOrCreateSubset(
otherSubset.getCluster(),
otherSubset.getTraitSet());
RelSubset subset = null;
RelTraitSet otherTraits = otherSubset.getTraitSet();

// If it is logical or derived physical traitSet
if (otherSubset.isDerived() || !otherSubset.isRequired()) {
subset = getOrCreateSubset(cluster, otherTraits, false);
}

// It may be required only, or both derived and required,
// in which case, register again.
if (otherSubset.isRequired()) {
subset = getOrCreateSubset(cluster, otherTraits, true);
}

// collect RelSubset instances, whose best should be changed
if (otherSubset.bestCost.isLt(subset.bestCost)) {
changedSubsets.put(subset, otherSubset.best);
Expand Down
28 changes: 24 additions & 4 deletions core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class RelSubset extends AbstractRelNode {
//~ Static fields/initializers ---------------------------------------------

private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
private static final int DERIVED = 1;
private static final int REQUIRED = 2;

//~ Instance fields --------------------------------------------------------

Expand All @@ -98,10 +100,13 @@ public class RelSubset extends AbstractRelNode {
long timestamp;

/**
* Flag indicating whether this RelSubset's importance was artificially
* boosted.
* Physical property state of current subset
* 0: logical operators, NONE convention is neither DERIVED nor REQUIRED
* 1: traitSet DERIVED from child operators or itself
* 2: traitSet REQUIRED from parent operators
* 3: both DERIVED and REQUIRED
*/
boolean boosted;
private int state = 0;

//~ Constructors -----------------------------------------------------------

Expand All @@ -111,7 +116,6 @@ public class RelSubset extends AbstractRelNode {
RelTraitSet traits) {
super(cluster, traits);
this.set = set;
this.boosted = false;
assert traits.allSimple();
computeBestCost(cluster.getPlanner());
recomputeDigest();
Expand Down Expand Up @@ -144,6 +148,22 @@ private void computeBestCost(RelOptPlanner planner) {
}
}

void setDerived() {
state |= DERIVED;
}

void setRequired() {
state |= REQUIRED;
}

public boolean isDerived() {
return (state & DERIVED) == DERIVED;
}

public boolean isRequired() {
return (state & REQUIRED) == REQUIRED;
}

public RelNode getBest() {
return best;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ public RelNode changeTraits(final RelNode rel, RelTraitSet toTraits) {
return rel2;
}

return rel2.set.getOrCreateSubset(rel.getCluster(), toTraits.simplify());
return rel2.set.getOrCreateSubset(
rel.getCluster(), toTraits, true);
}

public RelOptPlanner chooseDelegate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ public void collectVariablesUsed(Set<CorrelationId> variableSet) {
// for default case, nothing to do
}

public boolean isEnforcer() {
return false;
}

public void collectVariablesSet(Set<CorrelationId> variableSet) {
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/calcite/rel/RelNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,17 @@ RelNode copy(
*/
void register(RelOptPlanner planner);

/**
* Indicates whether it is an enforcer operator, e.g. PhysicalSort,
* PhysicalHashDistribute, etc. As an enforcer, the operator must be
* created only when required traitSet is not satisfied by its input.
*
* @return Whether it is an enforcer operator
*/
default boolean isEnforcer() {
return false;
}

/**
* Returns whether the result of this relational expression is uniquely
* identified by this columns with the given ordinals.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/calcite/rel/core/Sort.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public RelNode accept(RexShuttle shuttle) {
return copy(traitSet, getInput(), collation, offset, fetch);
}

@Override public boolean isEnforcer() {
return offset == null && fetch == null
&& collation.getFieldCollations().size() > 0;
}

/**
* Returns the array of {@link RelFieldCollation}s asked for by the sort
* specification, from most significant to least significant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.calcite.adapter.enumerable.EnumerableHashJoin;
import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin;
import org.apache.calcite.adapter.enumerable.EnumerableNestedLoopJoin;
import org.apache.calcite.adapter.jdbc.JdbcToEnumerableConverter;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.hep.HepRelVertex;
Expand Down Expand Up @@ -203,6 +204,11 @@ public ImmutableList<RelCollation> collations(Values values,
values(mq, values.getRowType(), values.getTuples()));
}

public ImmutableList<RelCollation> collations(JdbcToEnumerableConverter rel,
RelMetadataQuery mq) {
return mq.collations(rel.getInput());
}

public ImmutableList<RelCollation> collations(HepRelVertex rel,
RelMetadataQuery mq) {
return mq.collations(rel.getCurrentRel());
Expand Down
Loading

0 comments on commit f17367e

Please sign in to comment.