Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,22 @@ public static boolean isInferRealtimeSegmentPartition(Map<String, String> queryO
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_REALTIME_SEGMENT_PARTITION, "false"));
}

public static boolean isUsePhysicalOptimizer(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER, "false"));
}

public static boolean isUseLiteMode(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE, "false"));
}

public static boolean isUseBrokerPruning(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING, "false"));
}

public static boolean isRunInBroker(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER, "false"));
}

@Nullable
private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) {
if (optionValue == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public QueryEnvironment(String database, TableCache tableCache, @Nullable Worker
*/
private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
boolean usePhysicalOptimizer = PhysicalPlannerContext.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
HepProgram traitProgram = getTraitProgram(workerManager, _envConfig, usePhysicalOptimizer);
SqlExplainFormat format = SqlExplainFormat.DOT;
if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;


/**
Expand Down Expand Up @@ -81,7 +81,7 @@ public PhysicalPlannerContext(RoutingManager routingManager, String hostName, in
_requestId = requestId;
_instanceId = instanceId;
_queryOptions = queryOptions == null ? Map.of() : queryOptions;
_useLiteMode = PhysicalPlannerContext.useLiteMode(queryOptions);
_useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions);
_instanceIdToQueryServerInstance.put(instanceId, getBrokerQueryServerInstance());
}

Expand Down Expand Up @@ -125,25 +125,4 @@ public boolean isUseLiteMode() {
private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}

public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String> queryOptions) {
if (queryOptions == null) {
return false;
}
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER, "false"));
}

public static boolean isUseBrokerPruning(@Nullable Map<String, String> queryOptions) {
if (queryOptions == null) {
return false;
}
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING, "false"));
}

private static boolean useLiteMode(@Nullable Map<String, String> queryOptions) {
if (queryOptions == null) {
return false;
}
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE, "false"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public PRelNode onMatch(PRelOptRuleCall call) {
leafStageRoot = leafStageRoot == null ? call._currentNode : leafStageRoot;
String tableName = getActualTableName((TableScan) call._currentNode.unwrap());
PinotQuery pinotQuery = LeafStageToPinotQuery.createPinotQueryForRouting(tableName, leafStageRoot.unwrap(),
!PhysicalPlannerContext.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
!QueryOptionsUtils.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
return assignTableScan((PhysicalTableScan) call._currentNode, _physicalPlannerContext.getRequestId(),
pinotQuery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;


Expand All @@ -46,16 +50,23 @@
public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {
private static final Random RANDOM = new Random();
private final PhysicalPlannerContext _context;
private final boolean _runInBroker;

public LiteModeWorkerAssignmentRule(PhysicalPlannerContext context) {
_context = context;
_runInBroker = QueryOptionsUtils.isRunInBroker(context.getQueryOptions());
}

@Override
public PRelNode execute(PRelNode currentNode) {
Set<String> workerSet = new HashSet<>();
accumulateWorkers(currentNode, workerSet);
List<String> workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
List<String> workers;
if (_runInBroker) {
workers = List.of(String.format("0@%s", _context.getInstanceId()));
} else {
accumulateWorkers(currentNode, workerSet);
workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
}
PinotDataDistribution pdd = new PinotDataDistribution(RelDistribution.Type.SINGLETON, workers, workers.hashCode(),
null, null);
return addExchangeAndWorkers(currentNode, null, pdd);
Expand All @@ -67,13 +78,24 @@ public PRelNode addExchangeAndWorkers(PRelNode currentNode, @Nullable PRelNode p
return currentNode;
}
return new PhysicalExchange(nodeId(), currentNode, pdd, Collections.emptyList(),
ExchangeStrategy.SINGLETON_EXCHANGE, null, PinotExecStrategyTrait.getDefaultExecStrategy());
ExchangeStrategy.SINGLETON_EXCHANGE, currentNode.unwrap().getTraitSet().getCollation(),
PinotExecStrategyTrait.getDefaultExecStrategy());
}
List<PRelNode> newInputs = new ArrayList<>();
for (PRelNode input : currentNode.getPRelInputs()) {
newInputs.add(addExchangeAndWorkers(input, currentNode, pdd));
}
return currentNode.with(newInputs, pdd);
currentNode = currentNode.with(newInputs, pdd);
if (!currentNode.areTraitsSatisfied()) {
RelCollation collation = currentNode.unwrap().getTraitSet().getCollation();
Preconditions.checkState(collation != null && !collation.getFieldCollations().isEmpty(),
"Expected non-null collation since traits are not satisfied");
PinotDataDistribution sortedPDD = new PinotDataDistribution(
RelDistribution.Type.SINGLETON, pdd.getWorkers(), pdd.getWorkerHash(), null, collation);
return new PhysicalSort(currentNode.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(), collation,
null, null, currentNode, nodeId(), sortedPDD, false);
}
return currentNode;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,11 +535,12 @@
"\n PhysicalProject(rnk=[$3], col1=[$0])",
"\n PhysicalFilter(condition=[=($3, 1)])",
"\n PhysicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n PhysicalSort(sort0=[$2], dir0=[ASC])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
Expand Down Expand Up @@ -604,6 +605,44 @@
}
]
},
"physical_opt_run_in_broker": {
"queries": [
{
"description": "(run-in-broker) Pagination on group-by results",
"sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET runInBroker=true; EXPLAIN PLAN FOR WITH tmp AS (SELECT col1, col2, col3, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col1, col2, col3 ORDER BY col2) SELECT * FROM tmp LIMIT 100,400",
"output": [
"Execution Plan",
"\nPhysicalSort(offset=[100], fetch=[400])",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that unlike other plans, the top-most plan node here is not an exchange. Reason is that the exchange in the sub-tree sends data to the broker directly, and there's no exchange required after that.

If there's a collation trait to be met, that'll be met by adding a sort.

"\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "(run-in-broker) Query with single semi join and aggregation",
"sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET runInBroker=true; EXPLAIN PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT col1 FROM b) GROUP BY col2",
"output": [
"Execution Plan",
"\nPhysicalProject(EXPR$0=[$1], col2=[$0])",
"\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[DIRECT])",
"\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[100000])",
"\n PhysicalProject(col2=[$1])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[100000])",
"\n PhysicalProject(col1=[$0])",
"\n PhysicalTableScan(table=[[default, b]])",
"\n"
]
}
]
},
"physical_opt_broker_pruning": {
"queries": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.serde.PlanNodeDeserializer;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
Expand All @@ -79,10 +78,11 @@
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
Expand Down Expand Up @@ -579,20 +579,13 @@ public static QueryResult runReducer(long requestId,
long timeoutMs,
Map<String, String> queryOptions,
MailboxService mailboxService) {

long startTimeMs = System.currentTimeMillis();
long deadlineMs = startTimeMs + timeoutMs;
// NOTE: Reduce stage is always stage 0
DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
PlanFragment planFragment = stagePlan.getPlanFragment();
PlanNode rootNode = planFragment.getFragmentRoot();

Preconditions.checkState(rootNode instanceof MailboxReceiveNode,
"Expecting mailbox receive node as root of reduce stage, got: %s", rootNode.getClass().getSimpleName());

MailboxReceiveNode receiveNode = (MailboxReceiveNode) rootNode;
List<WorkerMetadata> workerMetadata = stagePlan.getWorkerMetadataList();

Preconditions.checkState(workerMetadata.size() == 1,
"Expecting single worker for reduce stage, got: %s", workerMetadata.size());

Expand All @@ -603,7 +596,7 @@ public static QueryResult runReducer(long requestId,
workerMetadata.get(0), null, parentContext, true);

PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
DataSchema sourceSchema = receiveNode.getDataSchema();
DataSchema sourceSchema = rootNode.getDataSchema();
int numColumns = resultFields.size();
String[] columnNames = new String[numColumns];
ColumnDataType[] columnTypes = new ColumnDataType[numColumns];
Expand All @@ -617,8 +610,9 @@ public static QueryResult runReducer(long requestId,
ArrayList<Object[]> resultRows = new ArrayList<>();
MseBlock block;
MultiStageQueryStats queryStats;
try (MailboxReceiveOperator receiveOperator = new MailboxReceiveOperator(executionContext, receiveNode)) {
block = receiveOperator.nextBlock();
try (OpChain opChain = PlanNodeToOpChain.convert(rootNode, executionContext, (a, b) -> { })) {
MultiStageOperator rootOperator = opChain.getRoot();
block = rootOperator.nextBlock();
while (block.isData()) {
DataBlock dataBlock = ((MseBlock.Data) block).asSerialized().getDataBlock();
int numRows = dataBlock.getNumberOfRows();
Expand All @@ -637,9 +631,9 @@ public static QueryResult runReducer(long requestId,
resultRows.add(row);
}
}
block = receiveOperator.nextBlock();
block = rootOperator.nextBlock();
}
queryStats = receiveOperator.calculateStats();
queryStats = rootOperator.calculateStats();
}
// TODO: Improve the error handling, e.g. return partial response
if (block.isError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,9 @@ public static class QueryOptionKey {
// TODO(mse-physical): Consider removing this query option and making this the default, since there's already
// a table config to enable broker pruning (it is disabled by default).
public static final String USE_BROKER_PRUNING = "useBrokerPruning";
// When lite mode is enabled, if this flag is set, we will run all the non-leaf stage operators within the
// broker itself. That way, the MSE queries will model the scatter gather pattern used by the V1 Engine.
public static final String RUN_IN_BROKER = "runInBroker";
}

public static class QueryOptionValue {
Expand Down
Loading