Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ public static class JoinHintOptions {
public static class TableHintOptions {
public static final String PARTITION_KEY = "partition_key";
public static final String PARTITION_SIZE = "partition_size";
public static final String PARTITION_PARALLELISM = "partition_parallelism";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class DispatchablePlanMetadata implements Serializable {

// whether a stage is partitioned table scan
private boolean _isPartitionedTableScan;
private int _partitionParallelism;

public DispatchablePlanMetadata() {
_scannedTables = new ArrayList<>();
Expand Down Expand Up @@ -143,6 +144,14 @@ public void setPartitionedTableScan(boolean isPartitionedTableScan) {
_isPartitionedTableScan = isPartitionedTableScan;
}

public int getPartitionParallelism() {
return _partitionParallelism;
}

public void setPartitionParallelism(int partitionParallelism) {
_partitionParallelism = partitionParallelism;
}

public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
return _tableToUnavailableSegmentsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,49 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
}
} else if (senderMetadata.isPartitionedTableScan()) {
// For partitioned table scan, send the data to the worker with the same worker id (not necessary the same
// instance)
// TODO: Support further split the single partition into multiple workers
Preconditions.checkState(numSenders == numReceivers,
"Got different number of workers for partitioned table scan, sender: %s, receiver: %s", numSenders,
numReceivers);
for (int workerId = 0; workerId < numSenders; workerId++) {
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
Collections.emptyMap());
MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)),
Collections.emptyMap());
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverFragmentId, serderMailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(senderFragmentId, receiverMailboxMetadata);
// instance). When partition parallelism is configured, send the data to the corresponding workers.
// NOTE: Do not use partitionParallelism from the metadata because it might be configured only in the first
// child. Re-compute it based on the number of receivers.
int partitionParallelism = numReceivers / numSenders;
if (partitionParallelism == 1) {
for (int workerId = 0; workerId < numSenders; workerId++) {
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
Collections.emptyMap());
MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)),
Collections.emptyMap());
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverFragmentId, serderMailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(senderFragmentId, receiverMailboxMetadata);
}
} else {
int receiverWorkerId = 0;
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int i = 0; i < partitionParallelism; i++) {
VirtualServerAddress receiverAddress =
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
receiverWorkerId);
senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);

MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);

receiverWorkerId++;
}
}
}
} else {
// For other exchange types, send the data to all the instances in the receiver fragment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,22 @@ private void assignWorkersToLeafFragment(PlanFragment fragment, DispatchablePlan

DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
Map<String, String> tableOptions = metadata.getTableOptions();
String partitionKey = null;
int numPartitions = 0;
if (tableOptions != null) {
partitionKey = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
String partitionSize = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
if (partitionSize != null) {
numPartitions = Integer.parseInt(partitionSize);
}
}
String partitionKey =
tableOptions != null ? tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY) : null;
if (partitionKey == null) {
assignWorkersToNonPartitionedLeafFragment(metadata, context);
} else {
Preconditions.checkState(numPartitions > 0, "'%s' must be provided for partition key: %s",
String numPartitionsStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided for partition key: %s",
PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, numPartitions);
int numPartitions = Integer.parseInt(numPartitionsStr);
Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s",
PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
String partitionParallelismStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM);
int partitionParallelism = partitionParallelismStr != null ? Integer.parseInt(partitionParallelismStr) : 1;
Preconditions.checkState(partitionParallelism > 0, "'%s' must be positive: %s, got: %s",
PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM, partitionParallelism);
assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, numPartitions, partitionParallelism);
}
}

Expand Down Expand Up @@ -207,7 +208,7 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
}

private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata,
DispatchablePlanContext context, String partitionKey, int numPartitions) {
DispatchablePlanContext context, String partitionKey, int numPartitions, int partitionParallelism) {
String tableName = metadata.getScannedTables().get(0);
ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName, partitionKey, numPartitions);

Expand Down Expand Up @@ -238,6 +239,7 @@ private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata met
metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
metadata.setPartitionedTableScan(true);
metadata.setPartitionParallelism(partitionParallelism);
}

private void assignWorkersToIntermediateFragment(PlanFragment fragment, DispatchablePlanContext context) {
Expand All @@ -249,12 +251,29 @@ private void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId());

// If the first child is partitioned table scan, use the same worker assignment to avoid shuffling data
// TODO: Introduce a hint to control this
if (children.size() > 0) {
// If the first child is partitioned table scan, use the same worker assignment to avoid shuffling data. When
// partition parallelism is configured, create multiple intermediate stage workers on the same instance for each
// worker in the first child.
if (!children.isEmpty()) {
DispatchablePlanMetadata firstChildMetadata = metadataMap.get(children.get(0).getFragmentId());
if (firstChildMetadata.isPartitionedTableScan()) {
metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap());
int partitionParallelism = firstChildMetadata.getPartitionParallelism();
Map<Integer, QueryServerInstance> childWorkerIdToServerInstanceMap =
firstChildMetadata.getWorkerIdToServerInstanceMap();
if (partitionParallelism == 1) {
metadata.setWorkerIdToServerInstanceMap(childWorkerIdToServerInstanceMap);
} else {
int numChildWorkers = childWorkerIdToServerInstanceMap.size();
Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>();
int workerId = 0;
for (int i = 0; i < numChildWorkers; i++) {
QueryServerInstance serverInstance = childWorkerIdToServerInstanceMap.get(i);
for (int j = 0; j < partitionParallelism; j++) {
workerIdToServerInstanceMap.put(workerId++, serverInstance);
}
}
metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
}
return;
}
}
Expand Down Expand Up @@ -308,13 +327,13 @@ private void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
throw new IllegalStateException(
"No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
}
Map<String, String> options = context.getPlannerContext().getOptions();
int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
if (metadata.isRequiresSingletonInstance()) {
// require singleton should return a single global worker ID with 0;
metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
} else {
Map<String, String> options = context.getPlannerContext().getOptions();
int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>();
int workerId = 0;
for (ServerInstance serverInstance : serverInstances) {
Expand Down
12 changes: 12 additions & 0 deletions pinot-query-runtime/src/test/resources/queries/QueryHints.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,22 @@
"description": "Group by partition column",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
},
{
"description": "Group by partition column with partition parallelism",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num"
},
{
"description": "Colocated JOIN with partition column",
"sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num"
},
{
"description": "Colocated JOIN with partition column with partition parallelism",
"sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ ON {tbl1}.num = {tbl2}.num"
},
{
"description": "Colocated JOIN with partition column with partition parallelism in first table",
"sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num"
},
{
"description": "Colocated JOIN with partition column and group by partition column",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
Expand Down