Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -253,6 +254,14 @@ private static List<MailboxInfo> createMailboxInfo(Map<Integer, QueryServerInsta
for (var entry : workersByUniqueHostPort.entrySet()) {
result.add(new MailboxInfo(entry.getKey().getHostname(), entry.getKey().getQueryMailboxPort(), entry.getValue()));
}
// IMP: Return mailbox info sorted by workerIds. This is because SendingMailbox are created in this order, and
// record assignment for hash exchange follows modulo arithmetic. e.g. if we have sending mailbox in order:
// [worker-1, worker-0], then records with modulo 0 hash would end up in worker-1.
// Note that the workerIds list will be >1 in length only when there's a parallelism change. It's important to
// also know that MailboxSendOperator will iterate over this List<MailboxInfo> in order, and within each iteration
// iterate over all the workerIds of that MailboxInfo. The result List<SendingMailbox> is used for modulo
// arithmetic for any partitioning exchange strategy.
result.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0)));
return result;
}

Expand Down