Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions pinot-common/src/main/proto/mailbox.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,6 @@
// under the License.
//

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol we should do license check better

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package org.apache.pinot.common.proto;
Expand Down
2 changes: 1 addition & 1 deletion pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ message ListField {
// The key of the map is a string and the value of the map is a MemberVariableField.
message MapField {
map<string, MemberVariableField> content = 1;
}
}
18 changes: 0 additions & 18 deletions pinot-common/src/main/proto/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,6 @@
// under the License.
//

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package org.apache.pinot.common.proto;
Expand Down
33 changes: 7 additions & 26 deletions pinot-common/src/main/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,10 @@
// under the License.
//

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package org.apache.pinot.common.proto;

import "plan.proto";

service PinotQueryWorker {
// Dispatch a QueryRequest to a PinotQueryWorker
rpc Submit(QueryRequest) returns (QueryResponse);
Expand All @@ -59,7 +39,7 @@ message CancelResponse {
// QueryRequest is the dispatched content for all query stages to a physical worker.
message QueryRequest {
repeated StagePlan stagePlan = 1;
map<string, string> metadata = 2;
bytes metadata = 2; // Serialized Properties
}

// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
Expand All @@ -70,15 +50,13 @@ message QueryResponse {

message StagePlan {
int32 stageId = 1;
StageNode stageRoot = 2;
bytes rootNode = 2; // Serialized StageNode
StageMetadata stageMetadata = 3;
}

message StageMetadata {
repeated WorkerMetadata workerMetadata = 1;
map<string, string> customProperty = 2;
string serverAddress = 3;
repeated int32 workerIds = 4;
bytes customProperty = 2; // Serialized Properties
}

message WorkerMetadata {
Expand All @@ -90,5 +68,8 @@ message WorkerMetadata {
message MailboxMetadata {
repeated string mailboxId = 1;
repeated string virtualAddress = 2;
map<string, string> customProperty = 3;
}

message Properties {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to ser/de the map

map<string, string> property = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public PhysicalExplainPlanVisitor(DispatchableSubPlan dispatchableSubPlan) {
/**
* Explains the query plan.
*
* @see DispatchableSubPlan#explain()
* @param dispatchableSubPlan the queryPlan to explain
* @return a String representation of the query plan tree
*/
Expand Down Expand Up @@ -216,9 +215,8 @@ private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {

int receiverStageId = node.getReceiverStageId();
List<VirtualServerAddress> serverAddressList =
_dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId())
.getWorkerMetadataList().get(context._workerId)
.getMailBoxInfosMap().get(receiverStageId).getVirtualAddressList();
_dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList()
.get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses();
List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
context._builder.append("->");
String receivers = serverInstanceToWorkerIdList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ public List<DispatchablePlanFragment> constructDispatchablePlanFragmentList(Plan
int workerId = serverEntry.getKey();
QueryServerInstance queryServerInstance = serverEntry.getValue();
serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId);
WorkerMetadata.Builder workerMetadataBuilder = new WorkerMetadata.Builder().setVirtualServerAddress(
new VirtualServerAddress(queryServerInstance, workerId));
WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId),
workerIdToMailboxesMap.get(workerId));
if (workerIdToSegmentsMap != null) {
workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
}
workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId));
workerMetadataArray[workerId] = workerMetadataBuilder.build();
workerMetadataArray[workerId] = workerMetadata;
}

// set the stageMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pinot.query.planner.physical;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
Expand Down Expand Up @@ -63,7 +65,7 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
workerId, senderServer, receiverServer);
MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
Collections.singletonList(new VirtualServerAddress(senderServer, workerId)), Collections.emptyMap());
Collections.singletonList(new VirtualServerAddress(senderServer, workerId)));
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
Expand All @@ -78,11 +80,9 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
for (int workerId = 0; workerId < numSenders; workerId++) {
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
Collections.emptyMap());
Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)));
MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)),
Collections.emptyMap());
Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)));
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverFragmentId, serderMailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
Expand All @@ -94,22 +94,23 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism);
List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int i = 0; i < partitionParallelism; i++) {
VirtualServerAddress receiverAddress =
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
receiverWorkerId);
senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
receivingMailboxIds.add(mailboxId);
receivingAddresses.add(
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));

MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
receiverMailboxMetadata.getMailboxIds().add(mailboxId);
receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);

receiverWorkerId++;
}
Expand All @@ -123,22 +124,22 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
List<String> receivingMailboxIds = new ArrayList<>(numReceivers);
List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers);
MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
VirtualServerAddress receiverAddress =
new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId);
senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
receivingMailboxIds.add(mailboxId);
receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));

MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
receiverMailboxMetadata.getMailboxIds().add(mailboxId);
receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
}
}
}
Expand All @@ -154,14 +155,12 @@ private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, Disp
int numReceivers = receiverServerMap.size();
if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
// leaf-to-intermediate condition
return numSenders * sender.getPartitionParallelism() == numReceivers
&& sender.getPartitionFunction() != null
return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
} else {
// dynamic-broadcast condition || intermediate-to-intermediate
return numSenders == numReceivers
&& sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
return numSenders == numReceivers && sender.getPartitionFunction() != null && sender.getPartitionFunction()
.equalsIgnoreCase(receiver.getPartitionFunction());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,35 @@
*/
package org.apache.pinot.query.planner.physical;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.pinot.query.routing.MailboxMetadata;


public class MailboxIdUtils {
private MailboxIdUtils() {
}

private static final char SEPARATOR = '|';
public static final char SEPARATOR = '|';

public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId,
int receiverWorkerId) {
return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR
+ receiverStageId + SEPARATOR + receiverWorkerId;
return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR
+ receiverWorkerId;
}

public static String toMailboxId(long requestId, String planMailboxId) {
return Long.toString(requestId) + SEPARATOR + planMailboxId;
}

public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) {
return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList());
}

@VisibleForTesting
public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
int receiverWorkerId) {
return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
}
}
Loading