Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,30 @@ public static Map<String, Set<FieldConfig.IndexType>> getSkipIndexes(Map<String,
return skipIndexes;
}

@Nullable
public static Set<String> getSkipPlannerRules(Map<String, String> queryOptions) {
// Example config: skipPlannerRules='FilterIntoJoinRule,FilterAggregateTransposeRule'
// Example config: skipPlannerRules='FilterIntoJoin,FilterAggregateTranspose'
String skipIndexesStr = queryOptions.get(QueryOptionKey.SKIP_PLANNER_RULES);
if (skipIndexesStr == null) {
return null;
return Set.of();
}

String[] skippedRules = StringUtils.split(skipIndexesStr, ',');

return new HashSet<>(List.of(skippedRules));
}

public static Set<String> getUsePlannerRules(Map<String, String> queryOptions) {
// Example config: usePlannerRules='SortJoinTranspose, AggregateJoinTransposeExtended'
String usedIndexesStr = queryOptions.get(QueryOptionKey.USE_PLANNER_RULES);
if (usedIndexesStr == null) {
return Set.of();
}

String[] usedRules = StringUtils.split(usedIndexesStr, ',');

return new HashSet<>(List.of(usedRules));
}

@Nullable
public static Boolean isUseFixedReplica(Map<String, String> queryOptions) {
String useFixedReplica = queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.USE_FIXED_REPLICA);
Expand Down Expand Up @@ -345,6 +356,7 @@ public static Integer getNumGroupsWarningLimit(Map<String, String> queryOptions)
String numGroupsWarningLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT);
return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, numGroupsWarningLimit);
}

@Nullable
public static Integer getMaxInitialResultHolderCapacity(Map<String, String> queryOptions) {
String maxInitialResultHolderCapacity = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
import org.apache.calcite.rel.rules.PruneEmptyRules;
import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortJoinCopyRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotFilterIntoJoinRule;
Expand All @@ -48,6 +50,8 @@

/**
* Default rule sets for Pinot query
* Defaultly disabled rules are defined in
* {@link org.apache.pinot.spi.utils.CommonConstants.Broker#DEFAULT_DISABLED_RULES}
*/
public class PinotQueryRuleSets {
private PinotQueryRuleSets() {
Expand Down Expand Up @@ -98,7 +102,12 @@ private PinotQueryRuleSets() {
.instanceWithDescription(PlannerRuleNames.EVALUATE_LITERAL_FILTER),

// sort join rules
// TODO: evaluate the SORT_JOIN_TRANSPOSE and SORT_JOIN_COPY rules
// push sort through join for left/right outer join only, disabled by default
SortJoinTransposeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_JOIN_TRANSPOSE).toRule(),
// copy sort below join without offset and limit, disabled by default
SortJoinCopyRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_JOIN_COPY).toRule(),

// join rules
JoinPushExpressionsRule.Config.DEFAULT
Expand All @@ -125,6 +134,9 @@ private PinotQueryRuleSets() {
// push aggregate through join
AggregateJoinTransposeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE).toRule(),
// push aggregate functions through join, disabled by default
AggregateJoinTransposeRule.Config.EXTENDED
.withDescription(PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE_EXTENDED).toRule(),
// aggregate union rule
AggregateUnionAggregateRule.Config.DEFAULT
.withDescription(PlannerRuleNames.AGGREGATE_UNION_AGGREGATE).toRule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule;
Expand Down Expand Up @@ -151,8 +150,8 @@ public QueryEnvironment(Config config) {
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
_catalogReader = new PinotCatalogReader(
rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG, config.isCaseSensitive());
// default optProgram with no skip rule options
_optProgram = getOptProgram(null);
// default optProgram with no skip rule options and no use rule options
_optProgram = getOptProgram(Set.of(), Set.of());
}

public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) {
Expand All @@ -173,9 +172,10 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
HepProgram optProgram = _optProgram;
if (MapUtils.isNotEmpty(options)) {
Set<String> skipRuleSet = QueryOptionsUtils.getSkipPlannerRules(options);
if (CollectionUtils.isNotEmpty(skipRuleSet)) {
Set<String> useRuleSet = QueryOptionsUtils.getUsePlannerRules(options);
if (!skipRuleSet.isEmpty() || !useRuleSet.isEmpty()) {
// dynamically create optProgram according to rule options
optProgram = getOptProgram(skipRuleSet);
optProgram = getOptProgram(skipRuleSet, useRuleSet);
}
}
boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
Expand Down Expand Up @@ -493,7 +493,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex
* @param skipRuleSet parsed skipped rule name set from query options
* @return HepProgram that performs logical transformations
*/
private static HepProgram getOptProgram(@Nullable Set<String> skipRuleSet) {
private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> useRuleSet) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
// Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
// best to be explicit.
Expand All @@ -502,27 +502,17 @@ private static HepProgram getOptProgram(@Nullable Set<String> skipRuleSet) {
// ----
// Rules are disabled if its corresponding value is set to false in ruleFlags
// construct filtered BASIC_RULES, FILTER_PUSHDOWN_RULES, PROJECT_PUSHDOWN_RULES, PRUNE_RULES
List<RelOptRule> basicRules;
List<RelOptRule> filterPushdownRules;
List<RelOptRule> projectPushdownRules;
List<RelOptRule> pruneRules;
if (skipRuleSet == null) {
basicRules = PinotQueryRuleSets.BASIC_RULES;
filterPushdownRules = PinotQueryRuleSets.FILTER_PUSHDOWN_RULES;
projectPushdownRules = PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES;
pruneRules = PinotQueryRuleSets.PRUNE_RULES;
} else {
basicRules = filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet);
filterPushdownRules = filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet);
projectPushdownRules = filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet);
pruneRules = filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet);
}

List<RelOptRule> basicRules = filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet, useRuleSet);
List<RelOptRule> filterPushdownRules =
filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet, useRuleSet);
List<RelOptRule> projectPushdownRules =
filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet, useRuleSet);
List<RelOptRule> pruneRules = filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet, useRuleSet);

// Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
// the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
for (RelOptRule relOptRule : basicRules) {
hepProgramBuilder.addRuleInstance(relOptRule);
hepProgramBuilder.addRuleInstance(relOptRule);
}

// ----
Expand All @@ -542,11 +532,6 @@ private static HepProgram getOptProgram(@Nullable Set<String> skipRuleSet) {
return hepProgramBuilder.build();
}

// util func to check no rules are skipped
private static boolean noRulesSkipped(Set<String> set) {
return set.isEmpty();
}

/**
* Filter static RuleSet according to query options
* The filtering is done via checking query option with
Expand All @@ -556,11 +541,12 @@ private static boolean noRulesSkipped(Set<String> set) {
* @param skipRuleSet skip rule set from options
* @return filtered list of rules
*/
private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, Set<String> skipRuleSet) {
private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, Set<String> skipRuleSet,
Set<String> useRuleSet) {
List<RelOptRule> filteredRules = new ArrayList<>();
for (RelOptRule relOptRule : rules) {
String ruleName = relOptRule.toString();
if (isRuleSkipped(ruleName, skipRuleSet)) {
if (isRuleSkipped(ruleName, skipRuleSet, useRuleSet)) {
continue;
}
filteredRules.add(relOptRule);
Expand All @@ -569,14 +555,21 @@ private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, Set<Strin
}

/**
* Whether a rule is skipped, rules not skipped by default
* Returns whether a rule is skipped.
* A rule is disabled if it is in both skipRuleSet and useRuleSet
*
* @param ruleName description of the rule
* @param skipRuleSet query skipSet
* @return false if corresponding key is not in skipMap or the value is "false", else true
*/
private static boolean isRuleSkipped(String ruleName, Set<String> skipRuleSet) {
// can put rule-specific default behavior here
return skipRuleSet.contains(ruleName);
private static boolean isRuleSkipped(String ruleName, Set<String> skipRuleSet, Set<String> useRuleSet) {
if (skipRuleSet.contains(ruleName)) {
return true;
}
if (CommonConstants.Broker.DEFAULT_DISABLED_RULES.contains(ruleName)) {
return !useRuleSet.contains(ruleName);
}
return false;
}

private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config,
Expand Down Expand Up @@ -683,7 +676,6 @@ default boolean defaultUseLeafServerForIntermediateStage() {
return CommonConstants.Broker.DEFAULT_USE_LEAF_SERVER_FOR_INTERMEDIATE_STAGE;
}


@Value.Default
default boolean defaultEnableGroupTrim() {
return CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM;
Expand Down
Loading
Loading