Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public static class JoinHintOptions {

public static class TableHintOptions {
public static final String PARTITION_KEY = "partition_key";
public static final String PARTITION_FUNCTION = "partition_function";
public static final String PARTITION_SIZE = "partition_size";
public static final String PARTITION_PARALLELISM = "partition_parallelism";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public SubPlan makePlan(QueryPlan queryPlan) {

// Sub plan root needs to send final results back to the Broker
// TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead)
PlanNode subPlanRootSenderNode =
MailboxSendNode subPlanRootSenderNode =
new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(), 0,
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public PlanNode visitExchange(ExchangeNode node, Context context) {
MailboxSendNode mailboxSendNode =
new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
node.isPartitioned());
node.isPrePartitioned());
mailboxSendNode.addInput(nextPlanFragmentRoot);
_planFragmentMap.put(senderPlanFragmentId,
new PlanFragment(senderPlanFragmentId, mailboxSendNode, new ArrayList<>()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private static PlanNode convertLogicalExchange(Exchange node, int currentStageId
}
}
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
boolean isPartitioned = inputDistributionTrait != null
boolean isPrePartitioned = inputDistributionTrait != null
&& inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
&& inputDistributionTrait == node.getDistribution();
List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();
Expand All @@ -136,7 +136,7 @@ private static PlanNode convertLogicalExchange(Exchange node, int currentStageId
Set<String> tableNames = getTableNamesFromRelRoot(node);

return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType, tableNames,
node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver, isPartitioned);
node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver, isPrePartitioned);
}

private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,28 @@
/**
* The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
*
* <p>It contains information aboute:
* <p>It contains information
* <ul>
* <li>the tables it is suppose to scan for</li>
* <li>the underlying segments a stage requires to execute upon.</li>
* <li>the server instances to which this stage should be execute on</li>
* <li>extracted from {@link org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
* <li>extracted from {@link org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
* </ul>
*/
public class DispatchablePlanMetadata implements Serializable {
// These 2 fields are extracted from TableScanNode

// --------------------------------------------------------------------------
// Fields extracted with {@link DispatchablePlanVisitor}
// --------------------------------------------------------------------------
// info from TableNode
private final List<String> _scannedTables;
private Map<String, String> _tableOptions;
// info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
private boolean _isPrePartitioned;
// info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)
private boolean _requiresSingletonInstance;

// --------------------------------------------------------------------------
// Fields extracted with {@link PinotDispatchPlanner}
// --------------------------------------------------------------------------
// used for assigning server/worker nodes.
private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;

Expand All @@ -65,11 +75,8 @@ public class DispatchablePlanMetadata implements Serializable {
// time boundary info
private TimeBoundaryInfo _timeBoundaryInfo;

// whether a stage requires singleton instance to execute, e.g. stage contains global reduce (sort/agg) operator.
private boolean _requiresSingletonInstance;

// whether a stage is partitioned by the same way the sending exchange is desired
private boolean _isPartitioned;
// physical partition info
private String _partitionFunction;
private int _partitionParallelism;

public DispatchablePlanMetadata() {
Expand Down Expand Up @@ -136,12 +143,20 @@ public void setRequireSingleton(boolean newRequireInstance) {
_requiresSingletonInstance = _requiresSingletonInstance || newRequireInstance;
}

public boolean isPartitioned() {
return _isPartitioned;
public boolean isPrePartitioned() {
return _isPrePartitioned;
}

public void setPrePartitioned(boolean isPrePartitioned) {
_isPrePartitioned = isPrePartitioned;
}

public String getPartitionFunction() {
return _partitionFunction;
}

public void setPartitioned(boolean isPartitioned) {
_isPartitioned = isPartitioned;
public void setPartitionFunction(String partitionFunction) {
_partitionFunction = partitionFunction;
}

public int getPartitionParallelism() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Void visitMailboxReceive(MailboxReceiveNode node, DispatchablePlanContext
public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) {
node.getInputs().get(0).visit(this, context);
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.setPartitioned(node.isPartitioned());
dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned());
context.getDispatchablePlanStageRootMap().put(node.getPlanFragmentId(), node);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
} else if (senderMetadata.isPartitioned() && senderMetadata.getScannedTables().size() > 0
&& (numReceivers / numSenders > 0)) {
// For partitioned table scan, send the data to the worker with the same worker id (not necessary the same
// instance). When partition parallelism is configured, send the data to the corresponding workers.
// NOTE: Do not use partitionParallelism from the metadata because it might be configured only in the first
// child. Re-compute it based on the number of receivers.
} else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) {
// - direct exchange possible:
// 1. send the data to the worker with the same worker id (not necessary the same instance), 1-to-1 mapping
// 2. When partition parallelism is configured, fanout based on partition parallelism from each sender
// workerID to sequentially increment receiver workerIDs
int partitionParallelism = numReceivers / numSenders;
if (partitionParallelism == 1) {
// 1-to-1 mapping
for (int workerId = 0; workerId < numSenders; workerId++) {
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Expand All @@ -89,6 +89,7 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
.put(senderFragmentId, receiverMailboxMetadata);
}
} else {
// 1-to-<partition_parallelism> mapping
int receiverWorkerId = 0;
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
Expand Down Expand Up @@ -144,4 +145,23 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
}
return null;
}

private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, DispatchablePlanMetadata receiver) {
Map<Integer, QueryServerInstance> senderServerMap = sender.getWorkerIdToServerInstanceMap();
Map<Integer, QueryServerInstance> receiverServerMap = receiver.getWorkerIdToServerInstanceMap();

int numSenders = senderServerMap.size();
int numReceivers = receiverServerMap.size();
if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
// leaf-to-intermediate condition
return numSenders * sender.getPartitionParallelism() == numReceivers
&& sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
} else {
// dynamic-broadcast condition || intermediate-to-intermediate
return numSenders == numReceivers
&& sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ public DispatchableSubPlan createDispatchableSubPlan(SubPlan subPlan) {
// 3. add worker assignment after the dispatchable plan context is fulfilled after the visit.
context.getWorkerManager().assignWorkers(rootFragment, context);
// 4. compute the mailbox assignment for each stage.
// TODO: refactor this to be a pluggable interface.
rootNode.visit(MailboxAssignmentVisitor.INSTANCE, context);
// 5. Run physical optimizations
runPhysicalOptimizers(rootNode, context, _tableCache);
// 6. Run validations
runValidations(rootFragment, context);
// 7. convert it into query plan.
// TODO: refactor this to be a pluggable interface.
return finalizeDispatchableSubPlan(rootFragment, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ExchangeNode extends AbstractPlanNode {
private boolean _isSortOnReceiver = false;

@ProtoProperties
private boolean _isPartitioned = false;
private boolean _isPrePartitioned = false;

@ProtoProperties
private List<RelFieldCollation> _collations;
Expand All @@ -66,14 +66,14 @@ public ExchangeNode(int planFragmentId) {

public ExchangeNode(int currentStageId, DataSchema dataSchema, PinotRelExchangeType exchangeType,
Set<String> tableNames, RelDistribution distribution, List<RelFieldCollation> collations, boolean isSortOnSender,
boolean isSortOnReceiver, boolean isPartitioned) {
boolean isSortOnReceiver, boolean isPrePartitioned) {
super(currentStageId, dataSchema);
_exchangeType = exchangeType;
_keys = distribution.getKeys();
_distributionType = distribution.getType();
_isSortOnSender = isSortOnSender;
_isSortOnReceiver = isSortOnReceiver;
_isPartitioned = isPartitioned;
_isPrePartitioned = isPrePartitioned;
_collations = collations;
_tableNames = tableNames;
}
Expand Down Expand Up @@ -108,8 +108,8 @@ public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}

public boolean isPartitioned() {
return _isPartitioned;
public boolean isPrePartitioned() {
return _isPrePartitioned;
}

public List<RelFieldCollation> getCollations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MailboxReceiveNode extends AbstractPlanNode {

// this is only available during planning and should not be relied
// on in any post-serialization code
private transient PlanNode _sender;
private transient MailboxSendNode _sender;

public MailboxReceiveNode(int planFragmentId) {
super(planFragmentId);
Expand All @@ -65,7 +65,7 @@ public MailboxReceiveNode(int planFragmentId) {
public MailboxReceiveNode(int planFragmentId, DataSchema dataSchema, int senderStageId,
RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
@Nullable List<Integer> distributionKeys, @Nullable List<RelFieldCollation> fieldCollations,
boolean isSortOnSender, boolean isSortOnReceiver, PlanNode sender) {
boolean isSortOnSender, boolean isSortOnReceiver, MailboxSendNode sender) {
super(planFragmentId, dataSchema);
_senderStageId = senderStageId;
_distributionType = distributionType;
Expand Down Expand Up @@ -147,7 +147,7 @@ public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}

public PlanNode getSender() {
public MailboxSendNode getSender() {
return _sender;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class MailboxSendNode extends AbstractPlanNode {
@ProtoProperties
private boolean _isSortOnSender;
@ProtoProperties
private boolean _isPartitioned;
private boolean _isPrePartitioned;

public MailboxSendNode(int planFragmentId) {
super(planFragmentId);
Expand All @@ -57,7 +57,7 @@ public MailboxSendNode(int planFragmentId) {
public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
@Nullable List<Integer> distributionKeys, @Nullable List<RelFieldCollation> fieldCollations,
boolean isSortOnSender, boolean isPartitioned) {
boolean isSortOnSender, boolean isPrePartitioned) {
super(planFragmentId, dataSchema);
_receiverStageId = receiverStageId;
_distributionType = distributionType;
Expand All @@ -77,7 +77,7 @@ public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverSt
_collationDirections = Collections.emptyList();
}
_isSortOnSender = isSortOnSender;
_isPartitioned = isPartitioned;
_isPrePartitioned = isPrePartitioned;
}

public int getReceiverStageId() {
Expand Down Expand Up @@ -120,8 +120,8 @@ public boolean isSortOnSender() {
return _isSortOnSender;
}

public boolean isPartitioned() {
return _isPartitioned;
public boolean isPrePartitioned() {
return _isPrePartitioned;
}

@Override
Expand All @@ -130,7 +130,7 @@ public String explain() {
sb.append("MAIL_SEND(");
sb.append(_distributionType);
sb.append(')');
if (isPartitioned()) {
if (isPrePartitioned()) {
sb.append("[PARTITIONED]");
}
if (isSortOnSender()) {
Expand Down
Loading