Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final boolean _explainAskingServerDefault;
private final MultiStageQueryThrottler _queryThrottler;
private final ExecutorService _queryCompileExecutor;
private final Set<String> _defaultDisabledPlannerRules;
protected final long _extraPassiveTimeoutMs;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
Expand Down Expand Up @@ -169,6 +170,10 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
Executors.newFixedThreadPool(
Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
new NamedThreadFactory("multi-stage-query-compile-executor")));
_defaultDisabledPlannerRules =
_config.containsKey(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES) ? Set.copyOf(
_config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES, List.of()))
: CommonConstants.Broker.DEFAULT_DISABLED_RULES;
}

@Override
Expand Down Expand Up @@ -442,6 +447,7 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders
.defaultLiteModeLeafStageFanOutAdjustedLimit(defaultLiteModeFanoutAdjustedLimit)
.defaultLiteModeEnableJoins(defaultLiteModeEnableJoins)
.defaultHashFunction(defaultHashFunction)
.defaultDisabledPlannerRules(_defaultDisabledPlannerRules)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ default void explainLogical(@Language("sql") String query, String expected) {
default void explainLogical(@Language("sql") String query, String expected, Map<String, String> queryOptions) {
try {
String extraOptions = queryOptions.entrySet().stream()
.map(entry -> "SET " + entry.getKey() + "=" + entry.getValue() + ";\n")
.map(entry -> "SET " + entry.getKey() + "='" + entry.getValue() + "';\n")
.collect(Collectors.joining());
JsonNode jsonNode = postQuery(extraOptions + "explain plan without implementation for " + query);
if (!jsonNode.get("exceptions").isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -64,8 +65,12 @@ public void setUp()
}

protected void overrideBrokerConf(PinotConfiguration brokerConf) {
String property = CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN;
brokerConf.setProperty(property, "true");
brokerConf.setProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN, "true"
);
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES,
List.of(CommonConstants.Broker.PlannerRuleNames.AGGREGATE_FUNCTION_REWRITE,
CommonConstants.Broker.PlannerRuleNames.AGGREGATE_REDUCE_FUNCTIONS));
}

@BeforeMethod
Expand Down Expand Up @@ -166,6 +171,38 @@ public void simpleQueryLogical() {
+ " PinotLogicalTableScan(table=[[default, mytable]])\n");
}

@Test
public void testDefaultDisabledRuleOverride() {
// PinotAggregateFunctionRewriteRule and PinotAggregateReduceFunctionsRule are disabled using broker configs
explainLogical("SELECT SUM(AirlineID) FROM mytable",
"Execution Plan\n"
+ "PinotLogicalAggregate(group=[{}], agg#0=[SUM($0)], aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash])\n"
+ " PinotLogicalAggregate(group=[{}], agg#0=[SUM($6)], aggType=[LEAF])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n");

// Enable PinotAggregateFunctionRewriteRule through query option, ensure it overrides the broker config
explainLogical("SELECT SUM(AirlineID) FROM mytable",
"Execution Plan\n"
+ "PinotLogicalAggregate(group=[{}], agg#0=[SUMLONG($0)], aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash])\n"
+ " PinotLogicalAggregate(group=[{}], agg#0=[SUMLONG($6)], aggType=[LEAF])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n",
Map.of("usePlannerRules", CommonConstants.Broker.PlannerRuleNames.AGGREGATE_FUNCTION_REWRITE));

// Enable PinotAggregateFunctionRewriteRule and PinotAggregateReduceFunctionsRule through query option, ensure they
// override the broker config
explainLogical("SELECT SUM(AirlineID) FROM mytable",
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n"
+ " PinotLogicalAggregate(group=[{}], agg#0=[SUMLONG($0)], agg#1=[COUNT($1)], aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash])\n"
+ " PinotLogicalAggregate(group=[{}], agg#0=[SUMLONG($6)], agg#1=[COUNT()], aggType=[LEAF])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n",
Map.of("usePlannerRules", CommonConstants.Broker.PlannerRuleNames.AGGREGATE_FUNCTION_REWRITE + ","
+ CommonConstants.Broker.PlannerRuleNames.AGGREGATE_REDUCE_FUNCTIONS));
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ private static AggregateCall maybeRewriteAggCall(AggregateCall call, RelNode inp
}
break;
}
case SUM: {
case SUM:
case SUM0: {
if (operandType == SqlTypeName.INTEGER) {
newAgg = new PinotSqlAggFunction("SUMINT", SqlKind.OTHER_FUNCTION, ReturnTypes.explicit(call.getType()),
aggFunction.getOperandTypeChecker(), SqlFunctionCategory.USER_DEFINED_FUNCTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class QueryEnvironment {
private final HepProgram _optProgram;
private final Config _envConfig;
private final PinotCatalog _catalog;
private final Set<String> _defaultDisabledPlannerRules;

public QueryEnvironment(Config config) {
_envConfig = config;
Expand All @@ -157,8 +158,9 @@ public QueryEnvironment(Config config) {
.build();
_catalogReader = new PinotCatalogReader(
rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG, config.isCaseSensitive());
_defaultDisabledPlannerRules = _envConfig.defaultDisabledPlannerRules();
// default optProgram with no skip rule options and no use rule options
_optProgram = getOptProgram(Set.of(), Set.of());
_optProgram = getOptProgram(Set.of(), Set.of(), _defaultDisabledPlannerRules);
}

public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) {
Expand Down Expand Up @@ -195,7 +197,7 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
Set<String> skipRuleSet = QueryOptionsUtils.getSkipPlannerRules(options);
if (!skipRuleSet.isEmpty() || !useRuleSet.isEmpty()) {
// dynamically create optProgram according to rule options
optProgram = getOptProgram(skipRuleSet, useRuleSet);
optProgram = getOptProgram(skipRuleSet, useRuleSet, _defaultDisabledPlannerRules);
}
}
boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions(),
Expand Down Expand Up @@ -516,9 +518,12 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex
* - In the third phase, the logical plan is prune with PRUNE_RULES.
*
* @param skipRuleSet parsed skipped rule name set from query options
* @param useRuleSet parsed use rule set from query options
* @param defaultDisabledRuleSet parsed default disabled rule set from broker config
* @return HepProgram that performs logical transformations
*/
private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> useRuleSet) {
private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> useRuleSet,
Set<String> defaultDisabledRuleSet) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
// Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
// best to be explicit.
Expand All @@ -527,12 +532,14 @@ private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> use
// ----
// Rules are disabled if its corresponding value is set to false in ruleFlags
// construct filtered BASIC_RULES, FILTER_PUSHDOWN_RULES, PROJECT_PUSHDOWN_RULES, PRUNE_RULES
List<RelOptRule> basicRules = filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet, useRuleSet);
List<RelOptRule> basicRules =
filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
List<RelOptRule> filterPushdownRules =
filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet, useRuleSet);
filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
List<RelOptRule> projectPushdownRules =
filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet, useRuleSet);
List<RelOptRule> pruneRules = filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet, useRuleSet);
filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);
List<RelOptRule> pruneRules =
filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet, useRuleSet, defaultDisabledRuleSet);

// Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
// the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
Expand Down Expand Up @@ -565,14 +572,16 @@ private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> use
*
* @param rules static list of rules
* @param skipRuleSet skip rule set from options
* @param useRuleSet use rule set from options
* @param defaultDisabledRuleSet default disabled rule set from config
* @return filtered list of rules
*/
private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, Set<String> skipRuleSet,
Set<String> useRuleSet) {
Set<String> useRuleSet, Set<String> defaultDisabledRuleSet) {
List<RelOptRule> filteredRules = new ArrayList<>();
for (RelOptRule relOptRule : rules) {
String ruleName = relOptRule.toString();
if (isRuleSkipped(ruleName, skipRuleSet, useRuleSet)) {
if (isRuleSkipped(ruleName, skipRuleSet, useRuleSet, defaultDisabledRuleSet)) {
continue;
}
filteredRules.add(relOptRule);
Expand All @@ -581,18 +590,22 @@ private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, Set<Strin
}

/**
* Returns whether a rule is skipped.
* A rule is disabled if it is in both skipRuleSet and useRuleSet
* Returns whether a rule should be skipped.
* A rule is disabled if it is in both skipRuleSet and useRuleSet.
*
* @param ruleName description of the rule
* @param skipRuleSet query skipSet
* @return false if corresponding key is not in skipMap or the value is "false", else true
* @param skipRuleSet rules to be skipped, this takes precedence over {@code useRuleSet}.
* @param useRuleSet rules to be used.
* @param defaultDisabledRuleSet rules disabled by default. Any rule in this set will still be applied if it's also
* present in {@code useRuleSet}.
* @return true if rule should be skipped
*/
private static boolean isRuleSkipped(String ruleName, Set<String> skipRuleSet, Set<String> useRuleSet) {
private static boolean isRuleSkipped(String ruleName, Set<String> skipRuleSet, Set<String> useRuleSet,
Set<String> defaultDisabledRuleSet) {
if (skipRuleSet.contains(ruleName)) {
return true;
}
if (CommonConstants.Broker.DEFAULT_DISABLED_RULES.contains(ruleName)) {
if (defaultDisabledRuleSet.contains(ruleName)) {
return !useRuleSet.contains(ruleName);
}
return false;
Expand All @@ -613,7 +626,8 @@ private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager,
hepProgramBuilder.addRuleInstance(relOptRule);
}
}
if (!isRuleSkipped(CommonConstants.Broker.PlannerRuleNames.JOIN_TO_ENRICHED_JOIN, Set.of(), useRuleSet)) {
if (!isRuleSkipped(CommonConstants.Broker.PlannerRuleNames.JOIN_TO_ENRICHED_JOIN, Set.of(), useRuleSet,
config.defaultDisabledPlannerRules())) {
// push filter and project above join to enrichedJoin, does not work with physical optimizer
hepProgramBuilder.addRuleCollection(PinotEnrichedJoinRule.PINOT_ENRICHED_JOIN_RULES);
}
Expand Down Expand Up @@ -804,13 +818,21 @@ default String defaultHashFunction() {
return CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION;
}

/**
* Whether to enable joins when using MSE Lite mode.
*/
@Value.Default
default boolean defaultLiteModeEnableJoins() {
return CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
}
/**
* Whether to enable joins when using MSE Lite mode.
*/
@Value.Default
default boolean defaultLiteModeEnableJoins() {
return CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
}

/**
* Returns the list of planner rules that are disabled by default.
*/
@Value.Default
default Set<String> defaultDisabledPlannerRules() {
return CommonConstants.Broker.DEFAULT_DISABLED_RULES;
}

/**
* Returns the worker manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,8 @@ public static class PlannerRuleNames {
PlannerRuleNames.JOIN_PUSH_TRANSITIVE_PREDICATES
);

public static final String CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES = "pinot.broker.mse.planner.disabled.rules";

public static class FailureDetector {
public enum Type {
// Do not detect any failure
Expand Down
Loading