Skip to content

Commit 81713b7

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-29126-cogroup-udf-usage-guide
2 parents da4f00b + 077fb99 commit 81713b7

File tree

399 files changed

+15374
-4707
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

399 files changed

+15374
-4707
lines changed

LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ Eclipse Distribution License (EDL) 1.0
544544
--------------------------------------
545545

546546
org.glassfish.jaxb:jaxb-runtime
547+
jakarta.activation:jakarta.activation-api
547548
jakarta.xml.bind:jakarta.xml.bind-api
548549
com.sun.istack:istack-commons-runtime
549550

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,12 @@ protected void handleMessage(
102102
FetchShuffleBlocks msg = (FetchShuffleBlocks) msgObj;
103103
checkAuth(client, msg.appId);
104104
numBlockIds = 0;
105-
for (int[] ids: msg.reduceIds) {
106-
numBlockIds += ids.length;
105+
if (msg.batchFetchEnabled) {
106+
numBlockIds = msg.mapIds.length;
107+
} else {
108+
for (int[] ids: msg.reduceIds) {
109+
numBlockIds += ids.length;
110+
}
107111
}
108112
streamId = streamManager.registerStream(client.getClientId(),
109113
new ShuffleManagedBufferIterator(msg), client.getChannel());
@@ -323,13 +327,15 @@ private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {
323327
private final int shuffleId;
324328
private final long[] mapIds;
325329
private final int[][] reduceIds;
330+
private final boolean batchFetchEnabled;
326331

327332
ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
328333
appId = msg.appId;
329334
execId = msg.execId;
330335
shuffleId = msg.shuffleId;
331336
mapIds = msg.mapIds;
332337
reduceIds = msg.reduceIds;
338+
batchFetchEnabled = msg.batchFetchEnabled;
333339
}
334340

335341
@Override
@@ -343,12 +349,20 @@ public boolean hasNext() {
343349

344350
@Override
345351
public ManagedBuffer next() {
346-
final ManagedBuffer block = blockManager.getBlockData(
347-
appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
348-
if (reduceIdx < reduceIds[mapIdx].length - 1) {
349-
reduceIdx += 1;
352+
ManagedBuffer block;
353+
if (!batchFetchEnabled) {
354+
block = blockManager.getBlockData(
355+
appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
356+
if (reduceIdx < reduceIds[mapIdx].length - 1) {
357+
reduceIdx += 1;
358+
} else {
359+
reduceIdx = 0;
360+
mapIdx += 1;
361+
}
350362
} else {
351-
reduceIdx = 0;
363+
assert(reduceIds[mapIdx].length == 2);
364+
block = blockManager.getContinuousBlocksData(appId, execId, shuffleId, mapIds[mapIdx],
365+
reduceIds[mapIdx][0], reduceIds[mapIdx][1]);
352366
mapIdx += 1;
353367
}
354368
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
5353
private final SecretKeyHolder secretKeyHolder;
5454
private final long registrationTimeoutMs;
5555

56-
protected TransportClientFactory clientFactory;
56+
protected volatile TransportClientFactory clientFactory;
5757
protected String appId;
5858

5959
/**
@@ -102,9 +102,14 @@ public void fetchBlocks(
102102
try {
103103
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
104104
(blockIds1, listener1) -> {
105-
TransportClient client = clientFactory.createClient(host, port);
106-
new OneForOneBlockFetcher(client, appId, execId,
107-
blockIds1, listener1, conf, downloadFileManager).start();
105+
// Unless this client is closed.
106+
if (clientFactory != null) {
107+
TransportClient client = clientFactory.createClient(host, port);
108+
new OneForOneBlockFetcher(client, appId, execId,
109+
blockIds1, listener1, conf, downloadFileManager).start();
110+
} else {
111+
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
112+
}
108113
};
109114

110115
int maxRetries = conf.maxIORetries();

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,21 +165,34 @@ public void registerExecutor(
165165
}
166166

167167
/**
168-
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
169-
* about how the hash and sort based shuffles store their data.
168+
* Obtains a FileSegmentManagedBuffer from a single block (shuffleId, mapId, reduceId).
170169
*/
171170
public ManagedBuffer getBlockData(
172171
String appId,
173172
String execId,
174173
int shuffleId,
175174
long mapId,
176175
int reduceId) {
176+
return getContinuousBlocksData(appId, execId, shuffleId, mapId, reduceId, reduceId + 1);
177+
}
178+
179+
/**
180+
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, [startReduceId, endReduceId)).
181+
* We make assumptions about how the hash and sort based shuffles store their data.
182+
*/
183+
public ManagedBuffer getContinuousBlocksData(
184+
String appId,
185+
String execId,
186+
int shuffleId,
187+
long mapId,
188+
int startReduceId,
189+
int endReduceId) {
177190
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
178191
if (executor == null) {
179192
throw new RuntimeException(
180193
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
181194
}
182-
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
195+
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, startReduceId, endReduceId);
183196
}
184197

185198
public ManagedBuffer getRddBlockData(
@@ -296,13 +309,14 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
296309
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
297310
*/
298311
private ManagedBuffer getSortBasedShuffleBlockData(
299-
ExecutorShuffleInfo executor, int shuffleId, long mapId, int reduceId) {
312+
ExecutorShuffleInfo executor, int shuffleId, long mapId, int startReduceId, int endReduceId) {
300313
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
301314
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
302315

303316
try {
304317
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
305-
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
318+
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(
319+
startReduceId, endReduceId);
306320
return new FileSegmentManagedBuffer(
307321
conf,
308322
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import com.google.common.primitives.Ints;
2727
import com.google.common.primitives.Longs;
28-
import org.apache.commons.lang3.tuple.ImmutableTriple;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

@@ -113,39 +112,47 @@ private boolean isShuffleBlocks(String[] blockIds) {
113112
*/
114113
private FetchShuffleBlocks createFetchShuffleBlocksMsg(
115114
String appId, String execId, String[] blockIds) {
116-
int shuffleId = splitBlockId(blockIds[0]).left;
115+
String[] firstBlock = splitBlockId(blockIds[0]);
116+
int shuffleId = Integer.parseInt(firstBlock[1]);
117+
boolean batchFetchEnabled = firstBlock.length == 5;
118+
117119
HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
118120
for (String blockId : blockIds) {
119-
ImmutableTriple<Integer, Long, Integer> blockIdParts = splitBlockId(blockId);
120-
if (blockIdParts.left != shuffleId) {
121+
String[] blockIdParts = splitBlockId(blockId);
122+
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
121123
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
122124
", got:" + blockId);
123125
}
124-
long mapId = blockIdParts.middle;
126+
long mapId = Long.parseLong(blockIdParts[2]);
125127
if (!mapIdToReduceIds.containsKey(mapId)) {
126128
mapIdToReduceIds.put(mapId, new ArrayList<>());
127129
}
128-
mapIdToReduceIds.get(mapId).add(blockIdParts.right);
130+
mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[3]));
131+
if (batchFetchEnabled) {
132+
// When we read continuous shuffle blocks in batch, we will reuse reduceIds in
133+
// FetchShuffleBlocks to store the start and end reduce id for range
134+
// [startReduceId, endReduceId).
135+
assert(blockIdParts.length == 5);
136+
mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
137+
}
129138
}
130139
long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
131140
int[][] reduceIdArr = new int[mapIds.length][];
132141
for (int i = 0; i < mapIds.length; i++) {
133142
reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
134143
}
135-
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr);
144+
return new FetchShuffleBlocks(
145+
appId, execId, shuffleId, mapIds, reduceIdArr, batchFetchEnabled);
136146
}
137147

138-
/** Split the shuffleBlockId and return shuffleId, mapId and reduceId. */
139-
private ImmutableTriple<Integer, Long, Integer> splitBlockId(String blockId) {
148+
/** Split the shuffleBlockId and return shuffleId, mapId and reduceIds. */
149+
private String[] splitBlockId(String blockId) {
140150
String[] blockIdParts = blockId.split("_");
141-
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
151+
if (blockIdParts.length < 4 || blockIdParts.length > 5 || !blockIdParts[0].equals("shuffle")) {
142152
throw new IllegalArgumentException(
143153
"Unexpected shuffle block id format: " + blockId);
144154
}
145-
return new ImmutableTriple<>(
146-
Integer.parseInt(blockIdParts[1]),
147-
Long.parseLong(blockIdParts[2]),
148-
Integer.parseInt(blockIdParts[3]));
155+
return blockIdParts;
149156
}
150157

151158
/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,15 @@ public int getSize() {
5454
* Get index offset for a particular reducer.
5555
*/
5656
public ShuffleIndexRecord getIndex(int reduceId) {
57-
long offset = offsets.get(reduceId);
58-
long nextOffset = offsets.get(reduceId + 1);
57+
return getIndex(reduceId, reduceId + 1);
58+
}
59+
60+
/**
61+
* Get index offset for the reducer range of [startReduceId, endReduceId).
62+
*/
63+
public ShuffleIndexRecord getIndex(int startReduceId, int endReduceId) {
64+
long offset = offsets.get(startReduceId);
65+
long nextOffset = offsets.get(endReduceId);
5966
return new ShuffleIndexRecord(offset, nextOffset - offset);
6067
}
6168
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,32 @@ public class FetchShuffleBlocks extends BlockTransferMessage {
3535
// The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
3636
// it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
3737
public final long[] mapIds;
38+
// When batchFetchEnabled=true, reduceIds[i] contains 2 elements: startReduceId (inclusive) and
39+
// endReduceId (exclusive) for the mapper mapIds[i].
40+
// When batchFetchEnabled=false, reduceIds[i] contains all the reduce IDs that mapper mapIds[i]
41+
// needs to fetch.
3842
public final int[][] reduceIds;
43+
public final boolean batchFetchEnabled;
3944

4045
public FetchShuffleBlocks(
4146
String appId,
4247
String execId,
4348
int shuffleId,
4449
long[] mapIds,
45-
int[][] reduceIds) {
50+
int[][] reduceIds,
51+
boolean batchFetchEnabled) {
4652
this.appId = appId;
4753
this.execId = execId;
4854
this.shuffleId = shuffleId;
4955
this.mapIds = mapIds;
5056
this.reduceIds = reduceIds;
5157
assert(mapIds.length == reduceIds.length);
58+
this.batchFetchEnabled = batchFetchEnabled;
59+
if (batchFetchEnabled) {
60+
for (int[] ids: reduceIds) {
61+
assert(ids.length == 2);
62+
}
63+
}
5264
}
5365

5466
@Override
@@ -62,6 +74,7 @@ public String toString() {
6274
.add("shuffleId", shuffleId)
6375
.add("mapIds", Arrays.toString(mapIds))
6476
.add("reduceIds", Arrays.deepToString(reduceIds))
77+
.add("batchFetchEnabled", batchFetchEnabled)
6578
.toString();
6679
}
6780

@@ -73,6 +86,7 @@ public boolean equals(Object o) {
7386
FetchShuffleBlocks that = (FetchShuffleBlocks) o;
7487

7588
if (shuffleId != that.shuffleId) return false;
89+
if (batchFetchEnabled != that.batchFetchEnabled) return false;
7690
if (!appId.equals(that.appId)) return false;
7791
if (!execId.equals(that.execId)) return false;
7892
if (!Arrays.equals(mapIds, that.mapIds)) return false;
@@ -86,6 +100,7 @@ public int hashCode() {
86100
result = 31 * result + shuffleId;
87101
result = 31 * result + Arrays.hashCode(mapIds);
88102
result = 31 * result + Arrays.deepHashCode(reduceIds);
103+
result = 31 * result + (batchFetchEnabled ? 1 : 0);
89104
return result;
90105
}
91106

@@ -100,7 +115,8 @@ public int encodedLength() {
100115
+ 4 /* encoded length of shuffleId */
101116
+ Encoders.LongArrays.encodedLength(mapIds)
102117
+ 4 /* encoded length of reduceIds.size() */
103-
+ encodedLengthOfReduceIds;
118+
+ encodedLengthOfReduceIds
119+
+ 1; /* encoded length of batchFetchEnabled */
104120
}
105121

106122
@Override
@@ -113,6 +129,7 @@ public void encode(ByteBuf buf) {
113129
for (int[] ids: reduceIds) {
114130
Encoders.IntArrays.encode(buf, ids);
115131
}
132+
buf.writeBoolean(batchFetchEnabled);
116133
}
117134

118135
public static FetchShuffleBlocks decode(ByteBuf buf) {
@@ -125,6 +142,7 @@ public static FetchShuffleBlocks decode(ByteBuf buf) {
125142
for (int i = 0; i < reduceIdsSize; i++) {
126143
reduceIds[i] = Encoders.IntArrays.decode(buf);
127144
}
128-
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds);
145+
boolean batchFetchEnabled = buf.readBoolean();
146+
return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds, batchFetchEnabled);
129147
}
130148
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ public void serializeOpenShuffleBlocks() {
3030
checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
3131
checkSerializeDeserialize(new FetchShuffleBlocks(
3232
"app-1", "exec-2", 0, new long[] {0, 1},
33-
new int[][] {{ 0, 1 }, { 0, 1, 2 }}));
33+
new int[][] {{ 0, 1 }, { 0, 1, 2 }}, false));
34+
checkSerializeDeserialize(new FetchShuffleBlocks(
35+
"app-1", "exec-2", 0, new long[] {0, 1},
36+
new int[][] {{ 0, 1 }, { 0, 2 }}, true));
3437
checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
3538
new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
3639
checkSerializeDeserialize(new UploadBlock("app-1", "exec-2", "block-3", new byte[] { 1, 2 },

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,30 @@ public void testFetchShuffleBlocks() {
101101
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(blockMarkers[1]);
102102

103103
FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks(
104-
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }});
104+
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }}, false);
105105
checkOpenBlocksReceive(fetchShuffleBlocks, blockMarkers);
106106

107107
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
108108
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
109109
verifyOpenBlockLatencyMetrics();
110110
}
111111

112+
@Test
113+
public void testFetchShuffleBlocksInBatch() {
114+
ManagedBuffer[] batchBlockMarkers = {
115+
new NioManagedBuffer(ByteBuffer.wrap(new byte[10]))
116+
};
117+
when(blockResolver.getContinuousBlocksData(
118+
"app0", "exec1", 0, 0, 0, 1)).thenReturn(batchBlockMarkers[0]);
119+
120+
FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks(
121+
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }}, true);
122+
checkOpenBlocksReceive(fetchShuffleBlocks, batchBlockMarkers);
123+
124+
verify(blockResolver, times(1)).getContinuousBlocksData("app0", "exec1", 0, 0, 0, 1);
125+
verifyOpenBlockLatencyMetrics();
126+
}
127+
112128
@Test
113129
public void testOpenDiskPersistedRDDBlocks() {
114130
when(blockResolver.getRddBlockData("app0", "exec1", 0, 0)).thenReturn(blockMarkers[0]);
@@ -154,16 +170,17 @@ private void checkOpenBlocksReceive(BlockTransferMessage msg, ManagedBuffer[] bl
154170

155171
StreamHandle handle =
156172
(StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());
157-
assertEquals(2, handle.numChunks);
173+
assertEquals(blockMarkers.length, handle.numChunks);
158174

159175
@SuppressWarnings("unchecked")
160176
ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
161177
(ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
162178
verify(streamManager, times(1)).registerStream(anyString(), stream.capture(),
163179
any());
164180
Iterator<ManagedBuffer> buffers = stream.getValue();
165-
assertEquals(blockMarkers[0], buffers.next());
166-
assertEquals(blockMarkers[1], buffers.next());
181+
for (ManagedBuffer blockMarker : blockMarkers) {
182+
assertEquals(blockMarker, buffers.next());
183+
}
167184
assertFalse(buffers.hasNext());
168185
}
169186

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ public void testSortShuffleBlocks() throws IOException {
111111
CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
112112
assertEquals(sortBlock1, block1);
113113
}
114+
115+
try (InputStream blocksStream = resolver.getContinuousBlocksData(
116+
"app0", "exec0", 0, 0, 0, 2).createInputStream()) {
117+
String blocks =
118+
CharStreams.toString(new InputStreamReader(blocksStream, StandardCharsets.UTF_8));
119+
assertEquals(sortBlock0 + sortBlock1, blocks);
120+
}
114121
}
115122

116123
@Test

0 commit comments

Comments
 (0)