Skip to content

Commit 390a63d

Browse files
gianmcryptoe
authored andcommitted
MSQ: Allow for worker gaps. (apache#17277)
In a Dart query, all Historicals are given worker IDs, but not all of them are going to actually be started or receive work orders. This can create gaps in the set of workers. For example, workers 1 and 3 could have work assigned while workers 0 and 2 do not. This patch updates ControllerStageTracker and WorkerInputs to handle such gaps, by using the set of actual worker numbers, rather than 0..workerCount, in various places. (cherry picked from commit 06bbdb3)
1 parent f43964a commit 390a63d

File tree

10 files changed

+439
-59
lines changed

10 files changed

+439
-59
lines changed

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartition.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public static ReadablePartition striped(final int stageNumber, final int numWork
5959
return new ReadablePartition(stageNumber, workerNumbers, partitionNumber);
6060
}
6161

62+
/**
63+
* Returns an output partition that is striped across a set of {@code workerNumbers}.
64+
*/
65+
public static ReadablePartition striped(
66+
final int stageNumber,
67+
final IntSortedSet workerNumbers,
68+
final int partitionNumber
69+
)
70+
{
71+
return new ReadablePartition(stageNumber, workerNumbers, partitionNumber);
72+
}
73+
6274
/**
6375
* Returns an output partition that has been collected onto a single worker.
6476
*/

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartitions.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
2525
import it.unimi.dsi.fastutil.ints.Int2IntSortedMap;
2626
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
27+
import it.unimi.dsi.fastutil.ints.IntSortedSet;
2728

2829
import java.util.Collections;
2930
import java.util.List;
@@ -39,6 +40,7 @@
3940
@JsonSubTypes(value = {
4041
@JsonSubTypes.Type(name = "collected", value = CollectedReadablePartitions.class),
4142
@JsonSubTypes.Type(name = "striped", value = StripedReadablePartitions.class),
43+
@JsonSubTypes.Type(name = "sparseStriped", value = SparseStripedReadablePartitions.class),
4244
@JsonSubTypes.Type(name = "combined", value = CombinedReadablePartitions.class)
4345
})
4446
public interface ReadablePartitions extends Iterable<ReadablePartition>
@@ -59,7 +61,7 @@ static ReadablePartitions empty()
5961
/**
6062
* Combines various sets of partitions into a single set.
6163
*/
62-
static CombinedReadablePartitions combine(List<ReadablePartitions> readablePartitions)
64+
static ReadablePartitions combine(List<ReadablePartitions> readablePartitions)
6365
{
6466
return new CombinedReadablePartitions(readablePartitions);
6567
}
@@ -68,7 +70,7 @@ static CombinedReadablePartitions combine(List<ReadablePartitions> readableParti
6870
* Returns a set of {@code numPartitions} partitions striped across {@code numWorkers} workers: each worker contains
6971
* a "stripe" of each partition.
7072
*/
71-
static StripedReadablePartitions striped(
73+
static ReadablePartitions striped(
7274
final int stageNumber,
7375
final int numWorkers,
7476
final int numPartitions
@@ -82,11 +84,36 @@ static StripedReadablePartitions striped(
8284
return new StripedReadablePartitions(stageNumber, numWorkers, partitionNumbers);
8385
}
8486

87+
/**
88+
* Returns a set of {@code numPartitions} partitions striped across {@code workers}: each worker contains
89+
* a "stripe" of each partition.
90+
*/
91+
static ReadablePartitions striped(
92+
final int stageNumber,
93+
final IntSortedSet workers,
94+
final int numPartitions
95+
)
96+
{
97+
final IntAVLTreeSet partitionNumbers = new IntAVLTreeSet();
98+
for (int i = 0; i < numPartitions; i++) {
99+
partitionNumbers.add(i);
100+
}
101+
102+
if (workers.lastInt() == workers.size() - 1) {
103+
// Dense worker set. Use StripedReadablePartitions for compactness (send a single number rather than the
104+
// entire worker set) and for backwards compatibility (older workers cannot understand
105+
// SparseStripedReadablePartitions).
106+
return new StripedReadablePartitions(stageNumber, workers.size(), partitionNumbers);
107+
} else {
108+
return new SparseStripedReadablePartitions(stageNumber, workers, partitionNumbers);
109+
}
110+
}
111+
85112
/**
86113
* Returns a set of partitions that have been collected onto specific workers: each partition is on exactly
87114
* one worker.
88115
*/
89-
static CollectedReadablePartitions collected(
116+
static ReadablePartitions collected(
90117
final int stageNumber,
91118
final Map<Integer, Integer> partitionToWorkerMap
92119
)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.msq.input.stage;
21+
22+
import com.fasterxml.jackson.annotation.JsonCreator;
23+
import com.fasterxml.jackson.annotation.JsonProperty;
24+
import com.google.common.collect.Iterators;
25+
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
26+
import it.unimi.dsi.fastutil.ints.IntSortedSet;
27+
import org.apache.druid.msq.input.SlicerUtils;
28+
29+
import java.util.ArrayList;
30+
import java.util.Iterator;
31+
import java.util.List;
32+
import java.util.Objects;
33+
import java.util.Set;
34+
35+
/**
36+
* Set of partitions striped across a sparse set of {@code workers}. Each worker contains a "stripe" of each partition.
37+
*
38+
* @see StripedReadablePartitions dense version, where workers from [0..N) are all used.
39+
*/
40+
public class SparseStripedReadablePartitions implements ReadablePartitions
41+
{
42+
private final int stageNumber;
43+
private final IntSortedSet workers;
44+
private final IntSortedSet partitionNumbers;
45+
46+
/**
47+
* Constructor. Most callers should use {@link ReadablePartitions#striped(int, int, int)} instead, which takes
48+
* a partition count rather than a set of partition numbers.
49+
*/
50+
public SparseStripedReadablePartitions(
51+
final int stageNumber,
52+
final IntSortedSet workers,
53+
final IntSortedSet partitionNumbers
54+
)
55+
{
56+
this.stageNumber = stageNumber;
57+
this.workers = workers;
58+
this.partitionNumbers = partitionNumbers;
59+
}
60+
61+
@JsonCreator
62+
private SparseStripedReadablePartitions(
63+
@JsonProperty("stageNumber") final int stageNumber,
64+
@JsonProperty("workers") final Set<Integer> workers,
65+
@JsonProperty("partitionNumbers") final Set<Integer> partitionNumbers
66+
)
67+
{
68+
this(stageNumber, new IntAVLTreeSet(workers), new IntAVLTreeSet(partitionNumbers));
69+
}
70+
71+
@Override
72+
public Iterator<ReadablePartition> iterator()
73+
{
74+
return Iterators.transform(
75+
partitionNumbers.iterator(),
76+
partitionNumber -> ReadablePartition.striped(stageNumber, workers, partitionNumber)
77+
);
78+
}
79+
80+
@Override
81+
public List<ReadablePartitions> split(final int maxNumSplits)
82+
{
83+
final List<ReadablePartitions> retVal = new ArrayList<>();
84+
85+
for (List<Integer> entries : SlicerUtils.makeSlicesStatic(partitionNumbers.iterator(), maxNumSplits)) {
86+
if (!entries.isEmpty()) {
87+
retVal.add(new SparseStripedReadablePartitions(stageNumber, workers, new IntAVLTreeSet(entries)));
88+
}
89+
}
90+
91+
return retVal;
92+
}
93+
94+
@JsonProperty
95+
int getStageNumber()
96+
{
97+
return stageNumber;
98+
}
99+
100+
@JsonProperty
101+
IntSortedSet getWorkers()
102+
{
103+
return workers;
104+
}
105+
106+
@JsonProperty
107+
IntSortedSet getPartitionNumbers()
108+
{
109+
return partitionNumbers;
110+
}
111+
112+
@Override
113+
public boolean equals(Object o)
114+
{
115+
if (this == o) {
116+
return true;
117+
}
118+
if (o == null || getClass() != o.getClass()) {
119+
return false;
120+
}
121+
SparseStripedReadablePartitions that = (SparseStripedReadablePartitions) o;
122+
return stageNumber == that.stageNumber
123+
&& Objects.equals(workers, that.workers)
124+
&& Objects.equals(partitionNumbers, that.partitionNumbers);
125+
}
126+
127+
@Override
128+
public int hashCode()
129+
{
130+
return Objects.hash(stageNumber, workers, partitionNumbers);
131+
}
132+
133+
@Override
134+
public String toString()
135+
{
136+
return "StripedReadablePartitions{" +
137+
"stageNumber=" + stageNumber +
138+
", workers=" + workers +
139+
", partitionNumbers=" + partitionNumbers +
140+
'}';
141+
}
142+
}

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ void addPartialKeyInformationForWorker(
403403
throw new ISE("Stage does not gather result key statistics");
404404
}
405405

406-
if (workerNumber < 0 || workerNumber >= workerCount) {
406+
if (!workerInputs.workers().contains(workerNumber)) {
407407
throw new IAE("Invalid workerNumber [%s]", workerNumber);
408408
}
409409

@@ -522,7 +522,7 @@ void mergeClusterByStatisticsCollectorForTimeChunk(
522522
throw new ISE("Stage does not gather result key statistics");
523523
}
524524

525-
if (workerNumber < 0 || workerNumber >= workerCount) {
525+
if (!workerInputs.workers().contains(workerNumber)) {
526526
throw new IAE("Invalid workerNumber [%s]", workerNumber);
527527
}
528528

@@ -656,7 +656,7 @@ void mergeClusterByStatisticsCollectorForAllTimeChunks(
656656
throw new ISE("Stage does not gather result key statistics");
657657
}
658658

659-
if (workerNumber < 0 || workerNumber >= workerCount) {
659+
if (!workerInputs.workers().contains(workerNumber)) {
660660
throw new IAE("Invalid workerNumber [%s]", workerNumber);
661661
}
662662

@@ -763,7 +763,7 @@ void setClusterByPartitionBoundaries(ClusterByPartitions clusterByPartitions)
763763
this.resultPartitionBoundaries = clusterByPartitions;
764764
this.resultPartitions = ReadablePartitions.striped(
765765
stageDef.getStageNumber(),
766-
workerCount,
766+
workerInputs.workers(),
767767
clusterByPartitions.size()
768768
);
769769

@@ -788,7 +788,7 @@ void setDoneReadingInputForWorker(final int workerNumber)
788788
throw DruidException.defensive("Cannot setDoneReadingInput for stage[%s], it is not sorting", stageDef.getId());
789789
}
790790

791-
if (workerNumber < 0 || workerNumber >= workerCount) {
791+
if (!workerInputs.workers().contains(workerNumber)) {
792792
throw new IAE("Invalid workerNumber[%s] for stage[%s]", workerNumber, stageDef.getId());
793793
}
794794

@@ -830,7 +830,7 @@ void setDoneReadingInputForWorker(final int workerNumber)
830830
@SuppressWarnings("unchecked")
831831
boolean setResultsCompleteForWorker(final int workerNumber, final Object resultObject)
832832
{
833-
if (workerNumber < 0 || workerNumber >= workerCount) {
833+
if (!workerInputs.workers().contains(workerNumber)) {
834834
throw new IAE("Invalid workerNumber [%s]", workerNumber);
835835
}
836836

@@ -947,14 +947,18 @@ private void generateResultPartitionsAndBoundariesWithoutKeyStatistics()
947947
resultPartitionBoundaries = maybeResultPartitionBoundaries.valueOrThrow();
948948
resultPartitions = ReadablePartitions.striped(
949949
stageNumber,
950-
workerCount,
950+
workerInputs.workers(),
951951
resultPartitionBoundaries.size()
952952
);
953-
} else if (shuffleSpec.kind() == ShuffleKind.MIX) {
954-
resultPartitionBoundaries = ClusterByPartitions.oneUniversalPartition();
955-
resultPartitions = ReadablePartitions.striped(stageNumber, workerCount, shuffleSpec.partitionCount());
956953
} else {
957-
resultPartitions = ReadablePartitions.striped(stageNumber, workerCount, shuffleSpec.partitionCount());
954+
if (shuffleSpec.kind() == ShuffleKind.MIX) {
955+
resultPartitionBoundaries = ClusterByPartitions.oneUniversalPartition();
956+
}
957+
resultPartitions = ReadablePartitions.striped(
958+
stageNumber,
959+
workerInputs.workers(),
960+
shuffleSpec.partitionCount()
961+
);
958962
}
959963
} else {
960964
// No reshuffling: retain partitioning from nonbroadcast inputs.

0 commit comments

Comments
 (0)