Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,21 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders
boolean defaultEnableDynamicFilteringSemiJoin = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN,
CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN);
boolean defaultUsePhysicalOptimizer = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_USE_PHYSICAL_OPTIMIZER,
CommonConstants.Broker.DEFAULT_USE_PHYSICAL_OPTIMIZER);
boolean defaultUseLiteMode = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_USE_LITE_MODE,
CommonConstants.Broker.DEFAULT_USE_LITE_MODE);
boolean defaultRunInBroker = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_RUN_IN_BROKER,
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER);
boolean defaultUseBrokerPruning = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_USE_BROKER_PRUNING,
CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING);
int defaultLiteModeServerStageLimit = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_LITE_MODE_LEAF_STAGE_LIMIT,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
boolean caseSensitive = !_config.getProperty(
CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY,
CommonConstants.Helix.DEFAULT_ENABLE_CASE_INSENSITIVE
Expand All @@ -389,6 +404,11 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders
.defaultUseLeafServerForIntermediateStage(defaultUseLeafServerForIntermediateStage)
.defaultEnableGroupTrim(defaultEnableGroupTrim)
.defaultEnableDynamicFilteringSemiJoin(defaultEnableDynamicFilteringSemiJoin)
.defaultUsePhysicalOptimizer(defaultUsePhysicalOptimizer)
.defaultUseLiteMode(defaultUseLiteMode)
.defaultRunInBroker(defaultRunInBroker)
.defaultUseBrokerPruning(defaultUseBrokerPruning)
.defaultLiteModeServerStageLimit(defaultLiteModeServerStageLimit)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,20 +431,29 @@ public static boolean isUseLeafServerForIntermediateStage(Map<String, String> qu
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

public static boolean isUsePhysicalOptimizer(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER, "false"));
public static boolean isUsePhysicalOptimizer(Map<String, String> queryOptions, boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.USE_PHYSICAL_OPTIMIZER);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

public static boolean isUseLiteMode(Map<String, String> queryOptions, boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.USE_LITE_MODE);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

public static boolean isUseLiteMode(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE, "false"));
public static boolean isUseBrokerPruning(Map<String, String> queryOptions, boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.USE_BROKER_PRUNING);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

public static boolean isUseBrokerPruning(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING, "true"));
public static boolean isRunInBroker(Map<String, String> queryOptions, boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.RUN_IN_BROKER);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

public static boolean isRunInBroker(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER, "true"));
public static int getLiteModeServerStageLimit(Map<String, String> queryOptions, int defaultValue) {
String option = queryOptions.get(QueryOptionKey.LITE_MODE_SERVER_STAGE_LIMIT);
return option != null ? checkedParseIntPositive(QueryOptionKey.LITE_MODE_SERVER_STAGE_LIMIT, option) : defaultValue;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
optProgram = getOptProgram(skipRuleSet);
}
}
boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions(),
_envConfig.defaultUsePhysicalOptimizer());
HepProgram traitProgram = getTraitProgram(workerManager, _envConfig, usePhysicalOptimizer);
SqlExplainFormat format = SqlExplainFormat.DOT;
if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
Expand All @@ -192,7 +193,9 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
workerManager = _envConfig.getWorkerManager();
physicalPlannerContext = new PhysicalPlannerContext(workerManager.getRoutingManager(),
workerManager.getHostName(), workerManager.getPort(), _envConfig.getRequestId(),
workerManager.getInstanceId(), sqlNodeAndOptions.getOptions());
workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
_envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(), _envConfig.defaultUseBrokerPruning(),
_envConfig.defaultLiteModeServerStageLimit());
}
return new PlannerContext(_config, _catalogReader, _typeFactory, optProgram, traitProgram,
sqlNodeAndOptions.getOptions(), _envConfig, format, physicalPlannerContext);
Expand Down Expand Up @@ -694,6 +697,66 @@ default boolean defaultEnableDynamicFilteringSemiJoin() {
return CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN;
}

/**
* Whether to use physical optimizer by default.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#USE_PHYSICAL_OPTIMIZER}.
*/
@Value.Default
default boolean defaultUsePhysicalOptimizer() {
return CommonConstants.Broker.DEFAULT_USE_PHYSICAL_OPTIMIZER;
}

/**
* Whether to use lite mode by default.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#USE_LITE_MODE}.
*/
@Value.Default
default boolean defaultUseLiteMode() {
return CommonConstants.Broker.DEFAULT_USE_LITE_MODE;
}

/**
* Whether to run in broker by default.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#RUN_IN_BROKER}.
*/
@Value.Default
default boolean defaultRunInBroker() {
return CommonConstants.Broker.DEFAULT_RUN_IN_BROKER;
}

/**
* Whether to use broker pruning by default.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#USE_BROKER_PRUNING}.
*/
@Value.Default
default boolean defaultUseBrokerPruning() {
return CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING;
}

/**
* Default server stage limit for lite mode queries.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#LITE_MODE_SERVER_STAGE_LIMIT}.
*/
@Value.Default
default int defaultLiteModeServerStageLimit() {
return CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
}

/**
* Returns the worker manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand Down Expand Up @@ -58,6 +59,9 @@ public Integer get() {
private final String _instanceId;
private final Map<String, String> _queryOptions;
private final boolean _useLiteMode;
private final boolean _runInBroker;
private final boolean _useBrokerPruning;
private final int _liteModeServerStageLimit;

/**
* Used by controller when it needs to extract table names from the query.
Expand All @@ -70,18 +74,26 @@ public PhysicalPlannerContext() {
_requestId = 0;
_instanceId = "";
_queryOptions = Map.of();
_useLiteMode = false;
_useLiteMode = CommonConstants.Broker.DEFAULT_USE_LITE_MODE;
_runInBroker = CommonConstants.Broker.DEFAULT_RUN_IN_BROKER;
_useBrokerPruning = CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING;
_liteModeServerStageLimit = CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
}

public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions) {
String instanceId, Map<String, String> queryOptions, boolean defaultUseLiteMode, boolean defaultRunInBroker,
boolean defaultUseBrokerPruning, int defaultLiteModeServerStageLimit) {
_routingManager = routingManager;
_hostName = hostName;
_port = port;
_requestId = requestId;
_instanceId = instanceId;
_queryOptions = queryOptions == null ? Map.of() : queryOptions;
_useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions);
_useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions, defaultUseLiteMode);
_runInBroker = QueryOptionsUtils.isRunInBroker(_queryOptions, defaultRunInBroker);
_useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions, defaultUseBrokerPruning);
_liteModeServerStageLimit = QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions,
defaultLiteModeServerStageLimit);
_instanceIdToQueryServerInstance.put(instanceId, getBrokerQueryServerInstance());
}

Expand Down Expand Up @@ -122,6 +134,18 @@ public boolean isUseLiteMode() {
return _useLiteMode;
}

public boolean isRunInBroker() {
return _runInBroker;
}

public boolean isUseBrokerPruning() {
return _useBrokerPruning;
}

public int getLiteModeServerStageLimit() {
return _liteModeServerStageLimit;
}

private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public PRelNode onMatch(PRelOptRuleCall call) {
leafStageRoot = leafStageRoot == null ? call._currentNode : leafStageRoot;
String tableName = getActualTableName((TableScan) call._currentNode.unwrap());
PinotQuery pinotQuery = LeafStageToPinotQuery.createPinotQueryForRouting(tableName, leafStageRoot.unwrap(),
!QueryOptionsUtils.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
!_physicalPlannerContext.isUseBrokerPruning());
return assignTableScan((PhysicalTableScan) call._currentNode, _physicalPlannerContext.getRequestId(),
pinotQuery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
public class LiteModeSortInsertRule extends PRelOptRule {
private static final TypeFactory TYPE_FACTORY = new TypeFactory();
private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
// TODO: This should be configurable at broker and via SET statements.
private static final int DEFAULT_SERVER_STAGE_LIMIT = 100_000;
private final PhysicalPlannerContext _context;

public LiteModeSortInsertRule(PhysicalPlannerContext context) {
Expand All @@ -65,27 +63,28 @@ public boolean matches(PRelOptRuleCall call) {

@Override
public PRelNode onMatch(PRelOptRuleCall call) {
RexNode newFetch = REX_BUILDER.makeLiteral(DEFAULT_SERVER_STAGE_LIMIT, TYPE_FACTORY.createSqlType(
int serverStageLimit = _context.getLiteModeServerStageLimit();
RexNode newFetch = REX_BUILDER.makeLiteral(serverStageLimit, TYPE_FACTORY.createSqlType(
SqlTypeName.INTEGER));
if (call._currentNode instanceof PhysicalSort) {
// When current node is a Sort, if it has a fetch already, verify it is less than the hard limit. Otherwise,
// set the configured hard limit within the same Sort.
PhysicalSort sort = (PhysicalSort) call._currentNode;
if (sort.fetch != null) {
int currentFetch = RexExpressionUtils.getValueAsInt(sort.fetch);
Preconditions.checkState(currentFetch <= DEFAULT_SERVER_STAGE_LIMIT,
Preconditions.checkState(currentFetch <= serverStageLimit,
"Attempted to stream %s records from server which exceed limit %s", currentFetch,
DEFAULT_SERVER_STAGE_LIMIT);
serverStageLimit);
return sort;
}
return sort.withFetch(newFetch);
}
if (call._currentNode instanceof PhysicalAggregate) {
// When current node is aggregate, add the limit to the Aggregate itself and skip adding the Sort.
PhysicalAggregate aggregate = (PhysicalAggregate) call._currentNode;
Preconditions.checkState(aggregate.getLimit() <= DEFAULT_SERVER_STAGE_LIMIT,
"Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), DEFAULT_SERVER_STAGE_LIMIT);
int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : DEFAULT_SERVER_STAGE_LIMIT;
Preconditions.checkState(aggregate.getLimit() <= serverStageLimit,
"Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), serverStageLimit);
int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : serverStageLimit;
return aggregate.withLimit(limit);
}
PRelNode input = call._currentNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.core.Sort;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
Expand All @@ -57,7 +56,7 @@ public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {

public LiteModeWorkerAssignmentRule(PhysicalPlannerContext context) {
_context = context;
_runInBroker = QueryOptionsUtils.isRunInBroker(context.getQueryOptions());
_runInBroker = context.isRunInBroker();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,42 @@ public static class Broker {
"pinot.broker.enable.dynamic.filtering.semijoin";
public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = true;

/**
* Whether to use physical optimizer by default.
* This value can always be overridden by {@link Request.QueryOptionKey#USE_PHYSICAL_OPTIMIZER} query option
*/
public static final String CONFIG_OF_USE_PHYSICAL_OPTIMIZER = "pinot.broker.multistage.use.physical.optimizer";
public static final boolean DEFAULT_USE_PHYSICAL_OPTIMIZER = false;

/**
* Whether to use lite mode by default.
* This value can always be overridden by {@link Request.QueryOptionKey#USE_LITE_MODE} query option
*/
public static final String CONFIG_OF_USE_LITE_MODE = "pinot.broker.multistage.use.lite.mode";
public static final boolean DEFAULT_USE_LITE_MODE = false;

/**
* Whether to run in broker by default.
* This value can always be overridden by {@link Request.QueryOptionKey#RUN_IN_BROKER} query option
*/
public static final String CONFIG_OF_RUN_IN_BROKER = "pinot.broker.multistage.run.in.broker";
public static final boolean DEFAULT_RUN_IN_BROKER = true;

/**
* Whether to use broker pruning by default.
* This value can always be overridden by {@link Request.QueryOptionKey#USE_BROKER_PRUNING} query option
*/
public static final String CONFIG_OF_USE_BROKER_PRUNING = "pinot.broker.multistage.use.broker.pruning";
public static final boolean DEFAULT_USE_BROKER_PRUNING = true;

/**
* Default server stage limit for lite mode queries.
* This value can always be overridden by {@link Request.QueryOptionKey#LITE_MODE_SERVER_STAGE_LIMIT} query option
*/
public static final String CONFIG_OF_LITE_MODE_LEAF_STAGE_LIMIT =
"pinot.broker.multistage.lite.mode.leaf.stage.limit";
public static final int DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT = 100_000;

// When the server instance's pool field is null or the pool contains multi distinguished group value, the broker
// would set the pool to -1 in the routing table for that server.
public static final int FALLBACK_POOL_ID = -1;
Expand Down Expand Up @@ -682,6 +718,8 @@ public static class QueryOptionKey {
// that was supposed to be in partition-1.
public static final String INFER_REALTIME_SEGMENT_PARTITION = "inferRealtimeSegmentPartition";
public static final String USE_LITE_MODE = "useLiteMode";
// Server stage limit for lite mode queries.
public static final String LITE_MODE_SERVER_STAGE_LIMIT = "liteModeServerStageLimit";
// Used by the MSE Engine to determine whether to use the broker pruning logic. Only supported by the
// new MSE query optimizer.
// TODO(mse-physical): Consider removing this query option and making this the default, since there's already
Expand Down
Loading