Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.calcite.rel.hint;

import org.apache.pinot.query.planner.logical.LiteralHintUtils;


/**
* {@code PinotHintOptions} specified the supported hint options by Pinot based a particular type of relation node.
*
Expand All @@ -40,6 +43,18 @@ private PinotHintOptions() {

public static class InternalAggregateOptions {
public static final String AGG_TYPE = "agg_type";
/**
* agg call signature is used to store LITERAL inputs to the Aggregate Call. which is not supported in Calcite
* here
* 1. we store the Map of Pair[aggCallIdx, argListIdx] to RexLiteral to indicate the RexLiteral being passed into
* the aggregateCalls[aggCallIdx].operandList[argListIdx] is supposed to be a RexLiteral.
* 2. not all RexLiteral types are supported to be part of the input constant call signature.
* 3. RexLiteral are encoded as String and decoded as Pinot Literal objects.
*
* see: {@link LiteralHintUtils}.
* see: https://issues.apache.org/jira/projects/CALCITE/issues/CALCITE-5833
*/
public static final String AGG_CALL_SIGNATURE = "agg_call_signature";
}

public static class AggregateOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.calcite.rel.hint;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.BooleanUtils;

Expand Down Expand Up @@ -64,6 +67,24 @@ public static boolean containsHint(List<RelHint> hintList, String hintName) {
* @return true if it contains the hint
*/
public static boolean containsHintOption(List<RelHint> hintList, String hintName, String optionKey) {
for (RelHint relHint : hintList) {
if (relHint.hintName.equals(hintName)) {
return relHint.kvOptions.containsKey(optionKey);
}
}
return false;
}

/**
* Check if a hint-able {@link org.apache.calcite.rel.RelNode} contains an option key for a specific hint name of
* {@link RelHint}, and the value is true via {@link BooleanUtils#toBoolean(Object)}.
*
* @param hintList hint list from the {@link org.apache.calcite.rel.RelNode}.
* @param hintName the name of the {@link RelHint}.
* @param optionKey the option key to look for in the {@link RelHint#kvOptions}.
* @return true if it contains the hint
*/
public static boolean isHintOptionTrue(List<RelHint> hintList, String hintName, String optionKey) {
for (RelHint relHint : hintList) {
if (relHint.hintName.equals(hintName)) {
return relHint.kvOptions.containsKey(optionKey) && BooleanUtils.toBoolean(relHint.kvOptions.get(optionKey));
Expand All @@ -90,4 +111,38 @@ public static String getHintOption(List<RelHint> hintList, String hintName, Stri
}
return null;
}

/**
* Replace the option value by option key in the {@link RelHint#kvOptions}. the option key is looked up from the
* specified hint name for a hint-able {@link org.apache.calcite.rel.RelNode}.
*
* <p>Note that Calcite's {@link RelHint} is not designed to be mutable, so replacing a hint option will
* result in copy of the entire hint list. This util is built as a work-around of the limitation of Calcite
* and Pinot in handling literals around aggregates. Consider the hint cloning overhead when utilizing this method.
* </p>
*
* @param oldHintList hint list from the {@link org.apache.calcite.rel.RelNode}.
* @param hintName the name of the {@link RelHint}.
* @param optionKey the option key to look for in the {@link RelHint#kvOptions}.
* @param optionValue the value to be set into {@link RelHint#kvOptions}.
*/
public static List<RelHint> replaceHintOptions(List<RelHint> oldHintList, String hintName, String optionKey,
String optionValue) {
boolean replaced = false;
List<RelHint> newHintList = new ArrayList<>();
for (RelHint oldHint : oldHintList) {
if (oldHint.hintName.equals(hintName)) {
Map<String, String> newHintKvOptions = new HashMap<>(oldHint.kvOptions);
newHintKvOptions.put(optionKey, optionValue);
newHintList.add(RelHint.builder(hintName).hintOptions(newHintKvOptions).build());
replaced = true;
} else {
newHintList.add(oldHint);
}
}
if (!replaced) {
newHintList.add(RelHint.builder(hintName).hintOption(optionKey, optionValue).build());
}
return newHintList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public boolean matches(RelOptRuleCall call) {
if (call.rel(0) instanceof Aggregate) {
Aggregate agg = call.rel(0);
ImmutableList<RelHint> hints = agg.getHints();
return !PinotHintStrategyTable.containsHint(hints, PinotHintOptions.INTERNAL_AGG_OPTIONS);
return !PinotHintStrategyTable.containsHintOption(hints, PinotHintOptions.INTERNAL_AGG_OPTIONS,
PinotHintOptions.InternalAggregateOptions.AGG_TYPE);
}
return false;
}
Expand All @@ -111,17 +112,18 @@ public void onMatch(RelOptRuleCall call) {
ImmutableList<RelHint> oldHints = oldAggRel.getHints();

Aggregate newAgg;
if (!oldAggRel.getGroupSet().isEmpty() && PinotHintStrategyTable.containsHintOption(oldHints,
if (!oldAggRel.getGroupSet().isEmpty() && PinotHintStrategyTable.isHintOptionTrue(oldHints,
PinotHintOptions.AGGREGATE_HINT_OPTIONS, PinotHintOptions.AggregateOptions.IS_PARTITIONED_BY_GROUP_BY_KEYS)) {
// ------------------------------------------------------------------------
// If the "is_partitioned_by_group_by_keys" aggregate hint option is set, just add additional hints indicating
// this is a single stage aggregation.
ImmutableList<RelHint> newLeafAggHints =
new ImmutableList.Builder<RelHint>().addAll(oldHints).add(createAggHint(AggType.DIRECT)).build();
List<RelHint> newHints = PinotHintStrategyTable.replaceHintOptions(oldAggRel.getHints(),
PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_TYPE,
AggType.DIRECT.name());
newAgg =
new LogicalAggregate(oldAggRel.getCluster(), oldAggRel.getTraitSet(), newLeafAggHints, oldAggRel.getInput(),
new LogicalAggregate(oldAggRel.getCluster(), oldAggRel.getTraitSet(), newHints, oldAggRel.getInput(),
oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), oldAggRel.getAggCallList());
} else if (!oldAggRel.getGroupSet().isEmpty() && PinotHintStrategyTable.containsHintOption(oldHints,
} else if (!oldAggRel.getGroupSet().isEmpty() && PinotHintStrategyTable.isHintOptionTrue(oldHints,
PinotHintOptions.AGGREGATE_HINT_OPTIONS,
PinotHintOptions.AggregateOptions.SKIP_LEAF_STAGE_GROUP_BY_AGGREGATION)) {
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -161,9 +163,9 @@ private RelNode createPlanWithLeafExchangeFinalAggregate(RelOptRuleCall call) {
*/
private RelNode createPlanWithExchangeDirectAggregation(RelOptRuleCall call) {
Aggregate oldAggRel = call.rel(0);
ImmutableList<RelHint> oldHints = oldAggRel.getHints();
ImmutableList<RelHint> newHints =
new ImmutableList.Builder<RelHint>().addAll(oldHints).add(createAggHint(AggType.DIRECT)).build();
List<RelHint> newHints = PinotHintStrategyTable.replaceHintOptions(oldAggRel.getHints(),
PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_TYPE,
AggType.DIRECT.name());

// create project when there's none below the aggregate to reduce exchange overhead
RelNode childRel = ((HepRelVertex) oldAggRel.getInput()).getCurrentRel();
Expand All @@ -183,7 +185,7 @@ private RelNode createPlanWithExchangeDirectAggregation(RelOptRuleCall call) {
* The following is copied from {@link AggregateExtractProjectRule#onMatch(RelOptRuleCall)}
* with modification to insert an exchange in between the Aggregate and Project
*/
private RelNode convertAggForExchangeDirectAggregate(RelOptRuleCall call, ImmutableList<RelHint> newHints) {
private RelNode convertAggForExchangeDirectAggregate(RelOptRuleCall call, List<RelHint> newHints) {
final Aggregate aggregate = call.rel(0);
final RelNode input = aggregate.getInput();
// Compute which input fields are used.
Expand Down Expand Up @@ -241,9 +243,8 @@ private Aggregate convertAggForLeafInput(Aggregate oldAggRel) {
newCalls.add(buildAggregateCall(oldAggRel.getInput(), oldCall, oldCall.getArgList(), oldAggRel.getGroupCount(),
AggType.LEAF));
}
ImmutableList<RelHint> oldHints = oldAggRel.getHints();
ImmutableList<RelHint> newHints =
new ImmutableList.Builder<RelHint>().addAll(oldHints).add(createAggHint(AggType.LEAF)).build();
List<RelHint> newHints = PinotHintStrategyTable.replaceHintOptions(oldAggRel.getHints(),
PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_TYPE, AggType.LEAF.name());
return new LogicalAggregate(oldAggRel.getCluster(), oldAggRel.getTraitSet(), newHints, oldAggRel.getInput(),
oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), newCalls);
}
Expand Down Expand Up @@ -278,8 +279,8 @@ private RelNode convertAggFromIntermediateInput(RelOptRuleCall ruleCall, Aggrega

// create new aggregate relation.
ImmutableList<RelHint> orgHints = oldAggRel.getHints();
ImmutableList<RelHint> newAggHint =
new ImmutableList.Builder<RelHint>().addAll(orgHints).add(createAggHint(aggType)).build();
List<RelHint> newAggHint = PinotHintStrategyTable.replaceHintOptions(orgHints,
PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_TYPE, aggType.name());
ImmutableBitSet groupSet = ImmutableBitSet.range(nGroups);
relBuilder.aggregate(relBuilder.groupKey(groupSet, ImmutableList.of(groupSet)), newCalls);
relBuilder.hints(newAggHint);
Expand Down Expand Up @@ -336,10 +337,4 @@ private static String getFunctionNameFromAggregateCall(AggregateCall aggregateCa
return aggregateCall.getAggregation().getName().equalsIgnoreCase("COUNT") && aggregateCall.isDistinct()
? "distinctCount" : aggregateCall.getAggregation().getName();
}

private static RelHint createAggHint(AggType aggType) {
return RelHint.builder(PinotHintOptions.INTERNAL_AGG_OPTIONS)
.hintOption(PinotHintOptions.InternalAggregateOptions.AGG_TYPE, aggType.name())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.calcite.rel.rules;

import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.planner.logical.LiteralHintUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.spi.data.FieldSpec;


/**
* Special rule to attach Literal to Aggregate call.
*/
public class PinotAggregateLiteralAttachmentRule extends RelOptRule {
public static final PinotAggregateLiteralAttachmentRule INSTANCE =
new PinotAggregateLiteralAttachmentRule(PinotRuleUtils.PINOT_REL_FACTORY);

public PinotAggregateLiteralAttachmentRule(RelBuilderFactory factory) {
super(operand(LogicalAggregate.class, some(operand(LogicalProject.class, any()))), factory, null);
}

@Override
public boolean matches(RelOptRuleCall call) {
if (call.rels.length < 1) {
return false;
}
if (call.rel(0) instanceof Aggregate) {
Aggregate agg = call.rel(0);
ImmutableList<RelHint> hints = agg.getHints();
return !PinotHintStrategyTable.containsHintOption(hints,
PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_CALL_SIGNATURE);
}
return false;
}

@Override
public void onMatch(RelOptRuleCall call) {
Aggregate aggregate = call.rel(0);
Map<Pair<Integer, Integer>, RexExpression.Literal> rexLiterals = extractLiterals(call);
List<RelHint> newHints = PinotHintStrategyTable.replaceHintOptions(aggregate.getHints(),
PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_CALL_SIGNATURE,
LiteralHintUtils.literalMapToHintString(rexLiterals));
// TODO: validate against AggregationFunctionType to see if expected literal positions are properly attached
call.transformTo(new LogicalAggregate(aggregate.getCluster(), aggregate.getTraitSet(), newHints,
aggregate.getInput(), aggregate.getGroupSet(), aggregate.getGroupSets(), aggregate.getAggCallList()));
}

private static Map<Pair<Integer, Integer>, RexExpression.Literal> extractLiterals(RelOptRuleCall call) {
Aggregate aggregate = call.rel(0);
Project project = call.rel(1);
List<RexNode> rexNodes = project.getProjects();
List<AggregateCall> aggCallList = aggregate.getAggCallList();
final Map<Pair<Integer, Integer>, RexExpression.Literal> rexLiteralMap = new HashMap<>();
for (int aggIdx = 0; aggIdx < aggCallList.size(); aggIdx++) {
AggregateCall aggCall = aggCallList.get(aggIdx);
int argSize = aggCall.getArgList().size();
if (argSize > 1) {
// use -1 argIdx to indicate size of the agg operands.
rexLiteralMap.put(new Pair<>(aggIdx, -1), new RexExpression.Literal(FieldSpec.DataType.INT, argSize));
// put the literals in to the map.
for (int argIdx = 0; argIdx < argSize; argIdx++) {
RexNode field = rexNodes.get(aggCall.getArgList().get(argIdx));
if (field instanceof RexLiteral) {
rexLiteralMap.put(new Pair<>(aggIdx, argIdx), LiteralHintUtils.rexLiteralToLiteral((RexLiteral) field));
}
}
}
}
return rexLiteralMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onMatch(RelOptRuleCall call) {
JoinInfo joinInfo = join.analyzeCondition();

boolean isColocatedJoin =
PinotHintStrategyTable.containsHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
if (isColocatedJoin) {
// join exchange are colocated, we should directly pass through via join key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void onMatch(RelOptRuleCall call) {
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());

boolean isColocatedJoin = PinotHintStrategyTable.containsHintOption(join.getHints(),
boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
? PinotLogicalExchange.create(right.getInput(), RelDistributions.SINGLETON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ private PinotQueryRuleSets() {
PruneEmptyRules.UNION_INSTANCE
);

// Pinot specific rules that should be run after all other rules
// Pinot specific rules that should be run BEFORE all other rules
public static final Collection<RelOptRule> PINOT_PRE_RULES = ImmutableList.of(
PinotAggregateLiteralAttachmentRule.INSTANCE
);


// Pinot specific rules that should be run AFTER all other rules
public static final Collection<RelOptRule> PINOT_POST_RULES = ImmutableList.of(
// Evaluate the Literal filter nodes
CoreRules.FILTER_REDUCE_EXPRESSIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,28 @@ public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, Worke
// Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
// best to be explicit.
hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);
// First run the basic rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:

// ----
// Run Pinot specific pre-rules
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PINOT_PRE_RULES);

// ----
// Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
// the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
for (RelOptRule relOptRule : PinotQueryRuleSets.BASIC_RULES) {
hepProgramBuilder.addRuleInstance(relOptRule);
}

// ----
// Pushdown filters using a single HepInstruction.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);

// ----
// Prune duplicate/unnecessary nodes using a single HepInstruction.
// TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PRUNE_RULES);

// ----
// Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
hepProgramBuilder.addRuleInstance(relOptRule);
Expand Down
Loading