Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,11 @@ public static boolean isUseLiteMode(Map<String, String> queryOptions) {
}

public static boolean isUseBrokerPruning(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING, "false"));
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING, "true"));
}

public static boolean isRunInBroker(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER, "false"));
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER, "true"));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void onMatch(RelOptRuleCall call) {
}
Map<String, String> hintOptions =
PinotHintStrategyTable.getHintOptions(aggRel.getHints(), PinotHintOptions.AGGREGATE_HINT_OPTIONS);
if (!isGroupTrimmingEnabled(call, hintOptions)) {
if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
return;
}
Sort sortRel = call.rel(0);
Expand Down Expand Up @@ -119,7 +119,7 @@ public void onMatch(RelOptRuleCall call) {
}
Map<String, String> hintOptions =
PinotHintStrategyTable.getHintOptions(aggRel.getHints(), PinotHintOptions.AGGREGATE_HINT_OPTIONS);
if (!isGroupTrimmingEnabled(call, hintOptions)) {
if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
return;
}

Expand Down Expand Up @@ -177,13 +177,17 @@ private static PinotLogicalAggregate createPlan(Aggregate aggRel, @Nullable List
leafReturnFinalResult, collations, limit);
}

private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, @Nullable Map<String, String> hintOptions) {
private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, @Nullable Map<String, String> hintOptions,
Aggregate aggRel) {
if (hintOptions != null) {
String option = hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM);
if (option != null) {
return Boolean.parseBoolean(option);
}
}
if (aggRel.getAggCallList().isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for Reviewers: For queries such as WITH tmp AS (SELECT DISTINCT col1, col2 FROM tbl LIMIT 1000) SELECT ..., at present we were not trimming groups by default which would have meant that the group-by could have grown really huge.

For such queries, we can always leverage group-trimming, and this change enables that.

return true;
}

Context genericContext = call.getPlanner().getContext();
if (genericContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ RelNode assignWindow(PhysicalWindow window) {
windowGroupCollation, null /* offset */, null /* fetch */, sort, _planIdGenerator.get(),
null /* pinot data distribution */, false /* leaf stage */);
} else {
input = input.copy(input.getTraitSet().plus(RelDistributions.SINGLETON), input.getInputs());
input = input.copy(input.getTraitSet().plus(RelDistributions.SINGLETON).plus(windowGroupCollation),
input.getInputs());
}
} else {
RelTraitSet newTraitSet = input.getTraitSet().plus(RelDistributions.SINGLETON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,14 @@ public boolean satisfies(@Nullable RelCollation relCollation) {
return _collation.satisfies(relCollation);
}

/**
* Applies a mapping to this distribution. This will DROP the collation however.
*/
public PinotDataDistribution apply(@Nullable PinotDistMapping mapping) {
return apply(mapping, true);
}

public PinotDataDistribution apply(@Nullable PinotDistMapping mapping, boolean dropCollation) {
if (mapping == null) {
return new PinotDataDistribution(RelDistribution.Type.ANY, _workers, _workerHash, null, null);
}
Expand All @@ -169,8 +176,7 @@ public PinotDataDistribution apply(@Nullable PinotDistMapping mapping) {
if (newType == RelDistribution.Type.HASH_DISTRIBUTED && newHashDesc.isEmpty()) {
newType = RelDistribution.Type.ANY;
}
// TODO: Preserve collation too.
RelCollation newCollation = RelCollations.EMPTY;
RelCollation newCollation = dropCollation ? RelCollations.EMPTY : PinotDistMapping.apply(_collation, mapping);
return new PinotDataDistribution(newType, _workers, _workerHash, newHashDesc, newCollation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.commons.collections4.CollectionUtils;


Expand Down Expand Up @@ -90,6 +93,13 @@ public static RelCollation apply(RelCollation relCollation, PinotDistMapping map
return RelCollations.of(newFieldCollations);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a question. Is it intentionally to select the newFieldIndices.get(0)?

  public static RelCollation apply(RelCollation relCollation, PinotDistMapping mapping) {
    if (relCollation.getKeys().isEmpty()) {
      return relCollation;
    }
    List<RelFieldCollation> newFieldCollations = new ArrayList<>();
    for (RelFieldCollation fieldCollation : relCollation.getFieldCollations()) {
      List<Integer> newFieldIndices = mapping.getTargets(fieldCollation.getFieldIndex());
      if (CollectionUtils.isEmpty(newFieldIndices)) {
        break;
      }
      newFieldCollations.add(fieldCollation.withFieldIndex(newFieldIndices.get(0)));
    }
    return RelCollations.of(newFieldCollations);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was by design because I wanted to keep it simple at the time. For context, this can occur in scenarios such as follows (which are rare or unlikely):

Project(col1=$0, col2=$0)
   Sort(collation=[order by $0 desc])
      TableScan(col1)

In this case, the project could be said to be ordered by both col1 and col2, but for now I am only preserving one of the indices to keep things simple. But I think it should be okay to add all field indexes too. I can take that up in a follow-up.

}

/**
* If a given RelNode is not guaranteed to preserve the sort order of the input, this returns true.
*/
public static boolean doesDropCollation(RelNode relNode) {
return !(relNode instanceof Project) && !(relNode instanceof Filter);
}

/**
* Consider a node that is partitioned on the key: [1]. If there's a project node on top of this, with project
* expressions as: [RexInputRef#0, RexInputRef#1, RexInputRef#1], then the project node will have two hash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public PRelNode onMatch(PRelOptRuleCall call) {
PhysicalAggregate aggregate = (PhysicalAggregate) call._currentNode;
Preconditions.checkState(aggregate.getLimit() <= DEFAULT_SERVER_STAGE_LIMIT,
"Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), DEFAULT_SERVER_STAGE_LIMIT);
// TODO(mse-physical): This resets the limit to server stage limit. Should we stick with group-trim limit?
return aggregate.withLimit(DEFAULT_SERVER_STAGE_LIMIT);
int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : DEFAULT_SERVER_STAGE_LIMIT;
return aggregate.withLimit(limit);
}
PRelNode input = call._currentNode;
return new PhysicalSort(input.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.core.Sort;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.apache.pinot.query.planner.physical.v2.mapping.DistMappingGenerator;
import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
Expand Down Expand Up @@ -67,31 +70,29 @@ public PRelNode execute(PRelNode currentNode) {
accumulateWorkers(currentNode, workerSet);
workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
}
PinotDataDistribution pdd = new PinotDataDistribution(RelDistribution.Type.SINGLETON, workers, workers.hashCode(),
null, null);
return addExchangeAndWorkers(currentNode, null, pdd);
return addExchangeAndWorkers(currentNode, null, workers);
}

public PRelNode addExchangeAndWorkers(PRelNode currentNode, @Nullable PRelNode parent, PinotDataDistribution pdd) {
public PRelNode addExchangeAndWorkers(PRelNode currentNode, @Nullable PRelNode parent, List<String> liteModeWorkers) {
if (currentNode.isLeafStage()) {
if (parent == null) {
// This is because the Root Exchange is added by the RootExchangeInsertRule.
return currentNode;
}
return new PhysicalExchange(nodeId(), currentNode, pdd, Collections.emptyList(),
ExchangeStrategy.SINGLETON_EXCHANGE, currentNode.unwrap().getTraitSet().getCollation(),
PinotExecStrategyTrait.getDefaultExecStrategy());
return computeLeafExchange(currentNode, liteModeWorkers);
}
List<PRelNode> newInputs = new ArrayList<>();
for (PRelNode input : currentNode.getPRelInputs()) {
newInputs.add(addExchangeAndWorkers(input, currentNode, pdd));
newInputs.add(addExchangeAndWorkers(input, currentNode, liteModeWorkers));
}
currentNode = currentNode.with(newInputs, pdd);
PinotDataDistribution currentNodePDD = inferPDD(currentNode, newInputs, liteModeWorkers);
currentNode = currentNode.with(newInputs, currentNodePDD);
if (!currentNode.areTraitsSatisfied()) {
RelCollation collation = currentNode.unwrap().getTraitSet().getCollation();
Preconditions.checkState(collation != null && !collation.getFieldCollations().isEmpty(),
"Expected non-null collation since traits are not satisfied");
PinotDataDistribution sortedPDD = new PinotDataDistribution(
RelDistribution.Type.SINGLETON, pdd.getWorkers(), pdd.getWorkerHash(), null, collation);
RelDistribution.Type.SINGLETON, liteModeWorkers, liteModeWorkers.hashCode(), null, collation);
return new PhysicalSort(currentNode.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(), collation,
null, null, currentNode, nodeId(), sortedPDD, false);
}
Expand Down Expand Up @@ -128,6 +129,46 @@ static String stripIdPrefixFromWorker(String worker) {
return worker.split("@")[1];
}

/**
* Infers Exchange to be added on top of the leaf stage.
*/
private PhysicalExchange computeLeafExchange(PRelNode leafStageRoot, List<String> liteModeWorkers) {
RelCollation collation = leafStageRoot.unwrap().getTraitSet().getCollation();
PinotDataDistribution pdd;
if (collation != null) {
// If the leaf stage root has a collation trait, then we will use a sorted receive in the exchange, so we can
// add the collation to the PDD.
pdd = new PinotDataDistribution(
RelDistribution.Type.SINGLETON, liteModeWorkers, liteModeWorkers.hashCode(), null, collation);
} else {
pdd = new PinotDataDistribution(
RelDistribution.Type.SINGLETON, liteModeWorkers, liteModeWorkers.hashCode(), null, null);
}
return new PhysicalExchange(nodeId(), leafStageRoot, pdd, Collections.emptyList(),
ExchangeStrategy.SINGLETON_EXCHANGE, collation, PinotExecStrategyTrait.getDefaultExecStrategy());
}

/**
* Infers distribution for the current node based on its inputs and node-type. Can also add collation to the PDD
* automatically (e.g. if the current node is a Sort or the input is sorted and this node does not drop collation).
*/
private static PinotDataDistribution inferPDD(PRelNode currentNode, List<PRelNode> newInputs,
List<String> liteModeWorkers) {
if (currentNode instanceof Sort) {
Sort sort = (Sort) currentNode.unwrap();
return new PinotDataDistribution(RelDistribution.Type.SINGLETON, liteModeWorkers,
liteModeWorkers.hashCode(), null, sort.getCollation());
}
if (newInputs.isEmpty()) {
// Can happen for Values node.
return new PinotDataDistribution(RelDistribution.Type.SINGLETON, liteModeWorkers,
liteModeWorkers.hashCode(), null, null);
}
return newInputs.get(0).getPinotDataDistributionOrThrow().apply(
DistMappingGenerator.compute(newInputs.get(0).unwrap(), currentNode.unwrap(), null),
PinotDistMapping.doesDropCollation(currentNode.unwrap()) /* dropCollation */);
}

private int nodeId() {
return _context.getNodeIdGenerator().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Sort;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
Expand All @@ -47,6 +48,7 @@
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.apache.pinot.query.planner.physical.v2.mapping.DistMappingGenerator;
import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -399,8 +401,16 @@ private static PinotDataDistribution computeCurrentNodeDistribution(PRelNode cur
return currentNode.getPinotDataDistributionOrThrow();
}
PinotDataDistribution inputDistribution = currentNode.getPRelInput(0).getPinotDataDistributionOrThrow();
return inputDistribution.apply(DistMappingGenerator.compute(
currentNode.unwrap().getInput(0), currentNode.unwrap(), null));
PinotDataDistribution newDistribution = inputDistribution.apply(DistMappingGenerator.compute(
currentNode.unwrap().getInput(0), currentNode.unwrap(), null),
PinotDistMapping.doesDropCollation(currentNode.unwrap()));
if (currentNode instanceof Sort) {
Sort sort = (Sort) currentNode.unwrap();
if (!sort.getCollation().getKeys().isEmpty()) {
return newDistribution.withCollation(sort.getCollation());
}
}
return newDistribution;
}

private static boolean isLeafStageBoundary(PRelNode currentNode, @Nullable PRelNode parentNode) {
Expand Down
Loading
Loading