Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private QueryEnvironment.CompiledQuery compileQuery(long requestId, String query
Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();

try {
ImmutableQueryEnvironment.Config queryEnvConf = getQueryEnvConf(httpHeaders, queryOptions);
ImmutableQueryEnvironment.Config queryEnvConf = getQueryEnvConf(httpHeaders, queryOptions, requestId);
QueryEnvironment queryEnv = new QueryEnvironment(queryEnvConf);
return callAsync(requestId, query, () -> queryEnv.compile(query, sqlNodeAndOptions), queryTimer);
} catch (WebApplicationException e) {
Expand Down Expand Up @@ -301,7 +301,8 @@ private void checkAuthorization(RequesterIdentity requesterIdentity, RequestCont
}
}

private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders, Map<String, String> queryOptions) {
private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders, Map<String, String> queryOptions,
long requestId) {
String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT,
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
Expand All @@ -313,6 +314,7 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN,
CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN);
return QueryEnvironment.configBuilder()
.requestId(requestId)
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule;
import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule;
import org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets;
Expand All @@ -63,6 +65,7 @@
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.context.RuleTimingPlannerListener;
import org.apache.pinot.query.planner.PlannerUtils;
Expand All @@ -75,6 +78,8 @@
import org.apache.pinot.query.planner.logical.TransformationTracker;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.PinotDispatchPlanner;
import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
import org.apache.pinot.query.planner.physical.v2.RelToPRelConverter;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.type.TypeFactory;
Expand Down Expand Up @@ -138,6 +143,7 @@ public QueryEnvironment(Config config) {

public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) {
this(configBuilder()
.requestId(-1L)
.database(database)
.tableCache(tableCache)
.workerManager(workerManager)
Expand All @@ -149,16 +155,24 @@ public QueryEnvironment(String database, TableCache tableCache, @Nullable Worker
*/
private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
HepProgram traitProgram = getTraitProgram(workerManager, _envConfig);
boolean usePhysicalOptimizer = PhysicalPlannerContext.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
HepProgram traitProgram = getTraitProgram(workerManager, _envConfig, usePhysicalOptimizer);
SqlExplainFormat format = SqlExplainFormat.DOT;
if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
if (explain.getFormat() != null) {
format = explain.getFormat();
}
}
PhysicalPlannerContext physicalPlannerContext = null;
if (usePhysicalOptimizer && _envConfig.getWorkerManager() != null) {
workerManager = _envConfig.getWorkerManager();
physicalPlannerContext = new PhysicalPlannerContext(workerManager.getRoutingManager(),
workerManager.getHostName(), workerManager.getPort(), _envConfig.getRequestId(),
workerManager.getInstanceId());
}
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram,
sqlNodeAndOptions.getOptions(), _envConfig, format);
sqlNodeAndOptions.getOptions(), _envConfig, format, physicalPlannerContext);
}

/// @deprecated Use [#compile] and then [plan][CompiledQuery#planQuery(long)] the returned query instead
Expand Down Expand Up @@ -309,6 +323,11 @@ private RelRoot compileQuery(SqlNode sqlNode, PlannerContext plannerContext) {
SqlNode validated = validate(sqlNode, plannerContext);
RelRoot relation = toRelation(validated, plannerContext);
RelNode optimized = optimize(relation, plannerContext);
if (plannerContext.isUsePhysicalOptimizer()) {
Preconditions.checkNotNull(plannerContext.getPhysicalPlannerContext(), "Physical planner context is null");
optimized = RelToPRelConverter.toPRelNode(optimized, plannerContext.getPhysicalPlannerContext(),
_envConfig.getTableCache()).unwrap();
}
return relation.withRel(optimized);
}

Expand Down Expand Up @@ -415,16 +434,25 @@ private RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) {
}
}

private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId) {
return toDispatchableSubPlan(relRoot, plannerContext, requestId, null);
private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext) {
return toDispatchableSubPlan(relRoot, plannerContext, null);
}

private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId,
private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext,
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
long requestId = _envConfig.getRequestId();
if (plannerContext.isUsePhysicalOptimizer()) {
Pair<SubPlan, PlanFragmentAndMailboxAssignment.Result> plan = PinotLogicalQueryPlanner.makePlanV2(relRoot,
plannerContext.getPhysicalPlannerContext());
PinotDispatchPlanner pinotDispatchPlanner = new PinotDispatchPlanner(plannerContext,
_envConfig.getWorkerManager(), requestId, _envConfig.getTableCache());
return pinotDispatchPlanner.createDispatchableSubPlanV2(plan.getLeft(), plan.getRight());
}
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker,
_envConfig.getTableCache(), useSpools(plannerContext.getOptions()));
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), requestId, _envConfig.getTableCache());
new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), _envConfig.getRequestId(),
_envConfig.getTableCache());
return pinotDispatchPlanner.createDispatchableSubPlan(plan);
}

Expand Down Expand Up @@ -462,26 +490,35 @@ private static HepProgram getOptProgram() {
return hepProgramBuilder.build();
}

private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config) {
private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config,
boolean usePhysicalOptimizer) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();

// Set the match order as BOTTOM_UP.
hepProgramBuilder.addMatchOrder(HepMatchOrder.BOTTOM_UP);

// ----
// Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
if (isEligibleQueryPostRule(relOptRule, config)) {
hepProgramBuilder.addRuleInstance(relOptRule);
if (!usePhysicalOptimizer) {
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
if (isEligibleQueryPostRule(relOptRule, config)) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
}
} else {
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES_V2) {
if (isEligibleQueryPostRule(relOptRule, config)) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
}
}

// apply RelDistribution trait to all nodes
if (workerManager != null) {
hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withWorkerManager(workerManager));
if (!usePhysicalOptimizer) {
// apply RelDistribution trait to all nodes
if (workerManager != null) {
hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withWorkerManager(workerManager));
}
hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE);
}
hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE);

return hepProgramBuilder.build();
}

Expand All @@ -507,6 +544,8 @@ public boolean useSpools(Map<String, String> options) {

@Value.Immutable
public interface Config {
long getRequestId();

String getDatabase();

/**
Expand Down Expand Up @@ -609,7 +648,7 @@ public QueryEnvironment.QueryPlannerResult explain(long requestId,
SqlExplainFormat format = _plannerContext.getSqlExplainFormat();
if (explain instanceof SqlPhysicalExplain) {
// get the physical plan for query.
DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(_relRoot, _plannerContext, requestId);
DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(_relRoot, _plannerContext);
return getQueryPlannerResult(_plannerContext, dispatchableSubPlan,
PhysicalExplainPlanVisitor.explain(dispatchableSubPlan), dispatchableSubPlan.getTableNames());
} else {
Expand All @@ -629,7 +668,7 @@ public QueryEnvironment.QueryPlannerResult explain(long requestId,
new TransformationTracker.ByIdentity.Builder<>();
// Transform RelNodes into DispatchableSubPlan
DispatchableSubPlan dispatchableSubPlan =
toDispatchableSubPlan(_relRoot, _plannerContext, requestId, nodeTracker);
toDispatchableSubPlan(_relRoot, _plannerContext, nodeTracker);

AskingServerStageExplainer serversExplainer = new AskingServerStageExplainer(
onServerExplainer, explainPlanVerbose, RelBuilder.create(_config));
Expand All @@ -652,7 +691,7 @@ public QueryPlannerResult planQuery(long requestId) {
// TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
// Each SubPlan should be able to run independently from Broker then set the results into the dependent
// SubPlan for further processing.
DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(_relRoot, _plannerContext, requestId);
DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(_relRoot, _plannerContext);
return getQueryPlannerResult(_plannerContext, dispatchableSubPlan, null, dispatchableSubPlan.getTableNames());
} catch (QueryException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;


/**
Expand Down Expand Up @@ -105,4 +106,11 @@ public long getRequestId() {
public String getInstanceId() {
return _instanceId;
}

public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String> queryOptions) {
if (queryOptions == null) {
return false;
}
return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER, "false"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.hep.HepProgram;
Expand Down Expand Up @@ -53,10 +54,12 @@ public class PlannerContext implements AutoCloseable {
private final Map<String, String> _options;
private final Map<String, String> _plannerOutput;
private final SqlExplainFormat _sqlExplainFormat;
@Nullable
private final PhysicalPlannerContext _physicalPlannerContext;

public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory,
HepProgram optProgram, HepProgram traitProgram, Map<String, String> options, QueryEnvironment.Config envConfig,
SqlExplainFormat sqlExplainFormat) {
SqlExplainFormat sqlExplainFormat, @Nullable PhysicalPlannerContext physicalPlannerContext) {
_planner = new PlannerImpl(config);
_validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory);
_relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs());
Expand All @@ -65,6 +68,7 @@ public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReade
_options = options;
_plannerOutput = new HashMap<>();
_sqlExplainFormat = sqlExplainFormat;
_physicalPlannerContext = physicalPlannerContext;
}

public PlannerImpl getPlanner() {
Expand Down Expand Up @@ -99,4 +103,13 @@ public Map<String, String> getPlannerOutput() {
public SqlExplainFormat getSqlExplainFormat() {
return _sqlExplainFormat;
}

@Nullable
public PhysicalPlannerContext getPhysicalPlannerContext() {
return _physicalPlannerContext;
}

public boolean isUsePhysicalOptimizer() {
return _physicalPlannerContext != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.SubPlanMetadata;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
import org.apache.pinot.query.planner.plannode.BasePlanNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
Expand Down Expand Up @@ -89,6 +93,18 @@ public static SubPlan makePlan(RelRoot relRoot,
// return subPlanMap.get(0);
}

public static Pair<SubPlan, PlanFragmentAndMailboxAssignment.Result> makePlanV2(RelRoot relRoot,
PhysicalPlannerContext physicalPlannerContext) {
PRelNode pRelNode = (PRelNode) relRoot.rel;
PlanFragmentAndMailboxAssignment planFragmentAndMailboxAssignment = new PlanFragmentAndMailboxAssignment();
PlanFragmentAndMailboxAssignment.Result result =
planFragmentAndMailboxAssignment.compute(pRelNode, physicalPlannerContext);
PlanFragment rootFragment = result._planFragmentMap.get(0);
SubPlan subPlan = new SubPlan(rootFragment,
new SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel), relRoot.fields), List.of());
return Pair.of(subPlan, result);
}

private static PlanFragment planNodeToPlanFragment(
PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, boolean useSpools) {
PlanFragmenter fragmenter = new PlanFragmenter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
import org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.validation.ArrayToMvValidationVisitor;
import org.apache.pinot.query.routing.WorkerManager;
Expand Down Expand Up @@ -74,6 +75,20 @@ public DispatchableSubPlan createDispatchableSubPlan(SubPlan subPlan) {
return finalizeDispatchableSubPlan(rootFragment, context);
}

public DispatchableSubPlan createDispatchableSubPlanV2(SubPlan subPlan,
PlanFragmentAndMailboxAssignment.Result result) {
// perform physical plan conversion and assign workers to each stage.
DispatchablePlanContext context = new DispatchablePlanContext(_workerManager, _requestId, _plannerContext,
subPlan.getSubPlanMetadata().getFields(), subPlan.getSubPlanMetadata().getTableNames());
PlanFragment rootFragment = subPlan.getSubPlanRoot();
context.getDispatchablePlanMetadataMap().putAll(result._fragmentMetadataMap);
for (var entry : result._planFragmentMap.entrySet()) {
context.getDispatchablePlanStageRootMap().put(entry.getKey(), entry.getValue().getFragmentRoot());
}
runValidations(rootFragment, context);
return finalizeDispatchableSubPlan(rootFragment, context);
}

/**
* Run validations on the plan. Since there is only one validator right now, don't try to over-engineer it.
*/
Expand Down
Loading
Loading