Skip to content

Commit 08f4a92

Browse files
authored
R52 revert ae (apache#179)
* Revert "release r49 (apache#162)" This reverts commit 62da28f. * Revert "release r48 (apache#161)" This reverts commit 1441531. * revert ae release r52 * Revert "release r48 (apache#161)" This reverts commit 1441531. Co-authored-by: 7mming7 <7mming7@gmail.com>
1 parent 908daf7 commit 08f4a92

File tree

109 files changed

+807
-3840
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+807
-3840
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.11</artifactId>
24-
<version>2.4.1-kylin-4.1.x-r51</version>
24+
<version>2.4.1-kylin-4.1.x-r52</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

common/kvstore/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-4.1.x-r51</version>
25+
<version>2.4.1-kylin-4.1.x-r52</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-4.1.x-r51</version>
25+
<version>2.4.1-kylin-4.1.x-r52</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-shuffle/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-4.1.x-r51</version>
25+
<version>2.4.1-kylin-4.1.x-r52</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -197,60 +197,48 @@ public Map<String, Metric> getMetrics() {
197197
}
198198
}
199199

200-
private boolean isShuffleBlock(String[] blockIdParts) {
201-
// length == 4: ShuffleBlockId
202-
// length == 5: ContinuousShuffleBlockId
203-
return (blockIdParts.length == 4 || blockIdParts.length == 5) &&
204-
blockIdParts[0].equals("shuffle");
205-
}
206-
207200
private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
208201

209202
private int index = 0;
210203
private final String appId;
211204
private final String execId;
212205
private final int shuffleId;
213-
// An array containing mapId, reduceId and numBlocks tuple
214-
private final int[] shuffleBlockIds;
206+
// An array containing mapId and reduceId pairs.
207+
private final int[] mapIdAndReduceIds;
215208

216209
ManagedBufferIterator(String appId, String execId, String[] blockIds) {
217210
this.appId = appId;
218211
this.execId = execId;
219212
String[] blockId0Parts = blockIds[0].split("_");
220-
if (!isShuffleBlock(blockId0Parts)) {
213+
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
221214
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
222215
}
223216
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
224-
shuffleBlockIds = new int[3 * blockIds.length];
217+
mapIdAndReduceIds = new int[2 * blockIds.length];
225218
for (int i = 0; i < blockIds.length; i++) {
226219
String[] blockIdParts = blockIds[i].split("_");
227-
if (!isShuffleBlock(blockIdParts)) {
220+
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
228221
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
229222
}
230223
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
231224
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
232-
", got:" + blockIds[i]);
233-
}
234-
shuffleBlockIds[3 * i] = Integer.parseInt(blockIdParts[2]);
235-
shuffleBlockIds[3 * i + 1] = Integer.parseInt(blockIdParts[3]);
236-
if (blockIdParts.length == 4) {
237-
shuffleBlockIds[3 * i + 2] = 1;
238-
} else {
239-
shuffleBlockIds[3 * i + 2] = Integer.parseInt(blockIdParts[4]);
225+
", got:" + blockIds[i]);
240226
}
227+
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
228+
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
241229
}
242230
}
243231

244232
@Override
245233
public boolean hasNext() {
246-
return index < shuffleBlockIds.length;
234+
return index < mapIdAndReduceIds.length;
247235
}
248236

249237
@Override
250238
public ManagedBuffer next() {
251239
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
252-
shuffleBlockIds[index], shuffleBlockIds[index + 1], shuffleBlockIds[index + 2]);
253-
index += 3;
240+
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
241+
index += 2;
254242
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
255243
return block;
256244
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,21 @@ public void registerExecutor(
162162
}
163163

164164
/**
165-
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks).
166-
* We make assumptions about how the hash and sort based shuffles store their data.
165+
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
166+
* about how the hash and sort based shuffles store their data.
167167
*/
168168
public ManagedBuffer getBlockData(
169-
String appId,
170-
String execId,
171-
int shuffleId,
172-
int mapId,
173-
int reduceId,
174-
int numBlocks) {
169+
String appId,
170+
String execId,
171+
int shuffleId,
172+
int mapId,
173+
int reduceId) {
175174
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
176175
if (executor == null) {
177176
throw new RuntimeException(
178-
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
177+
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
179178
}
180-
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks);
179+
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
181180
}
182181

183182
/**
@@ -281,19 +280,19 @@ public boolean accept(File dir, String name) {
281280
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
282281
*/
283282
private ManagedBuffer getSortBasedShuffleBlockData(
284-
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) {
283+
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
285284
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
286-
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
285+
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
287286

288287
try {
289288
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
290-
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId, numBlocks);
289+
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
291290
return new FileSegmentManagedBuffer(
292-
conf,
293-
getFile(executor.localDirs, executor.subDirsPerLocalDir,
294-
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
295-
shuffleIndexRecord.getOffset(),
296-
shuffleIndexRecord.getLength());
291+
conf,
292+
getFile(executor.localDirs, executor.subDirsPerLocalDir,
293+
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
294+
shuffleIndexRecord.getOffset(),
295+
shuffleIndexRecord.getLength());
297296
} catch (ExecutionException e) {
298297
throw new RuntimeException("Failed to open file: " + indexFile, e);
299298
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public int getSize() {
5959
/**
6060
* Get index offset for a particular reducer.
6161
*/
62-
public ShuffleIndexRecord getIndex(int reduceId, int numBlocks) {
62+
public ShuffleIndexRecord getIndex(int reduceId) {
6363
long offset = offsets.get(reduceId);
64-
long nextOffset = offsets.get(reduceId + numBlocks);
64+
long nextOffset = offsets.get(reduceId + 1);
6565
return new ShuffleIndexRecord(offset, nextOffset - offset);
6666
}
6767
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() {
8383

8484
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
8585
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
86-
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, 1)).thenReturn(block0Marker);
87-
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, 1)).thenReturn(block1Marker);
86+
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
87+
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
8888
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
8989
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
9090
.toByteBuffer();
@@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() {
106106
assertEquals(block0Marker, buffers.next());
107107
assertEquals(block1Marker, buffers.next());
108108
assertFalse(buffers.hasNext());
109-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, 1);
110-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, 1);
109+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
110+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
111111

112112
// Verify open block request latency metrics
113113
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void testBadRequests() throws IOException {
6666
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
6767
// Unregistered executor
6868
try {
69-
resolver.getBlockData("app0", "exec1", 1, 1, 0, 1);
69+
resolver.getBlockData("app0", "exec1", 1, 1, 0);
7070
fail("Should have failed");
7171
} catch (RuntimeException e) {
7272
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
@@ -75,7 +75,7 @@ public void testBadRequests() throws IOException {
7575
// Invalid shuffle manager
7676
try {
7777
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
78-
resolver.getBlockData("app0", "exec2", 1, 1, 0, 1);
78+
resolver.getBlockData("app0", "exec2", 1, 1, 0);
7979
fail("Should have failed");
8080
} catch (UnsupportedOperationException e) {
8181
// pass
@@ -85,7 +85,7 @@ public void testBadRequests() throws IOException {
8585
resolver.registerExecutor("app0", "exec3",
8686
dataContext.createExecutorInfo(SORT_MANAGER));
8787
try {
88-
resolver.getBlockData("app0", "exec3", 1, 1, 0, 1);
88+
resolver.getBlockData("app0", "exec3", 1, 1, 0);
8989
fail("Should have failed");
9090
} catch (Exception e) {
9191
// pass
@@ -99,25 +99,18 @@ public void testSortShuffleBlocks() throws IOException {
9999
dataContext.createExecutorInfo(SORT_MANAGER));
100100

101101
InputStream block0Stream =
102-
resolver.getBlockData("app0", "exec0", 0, 0, 0, 1).createInputStream();
102+
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
103103
String block0 = CharStreams.toString(
104104
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
105105
block0Stream.close();
106106
assertEquals(sortBlock0, block0);
107107

108108
InputStream block1Stream =
109-
resolver.getBlockData("app0", "exec0", 0, 0, 1, 1).createInputStream();
109+
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
110110
String block1 = CharStreams.toString(
111111
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
112112
block1Stream.close();
113113
assertEquals(sortBlock1, block1);
114-
115-
InputStream block01Stream =
116-
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream();
117-
String block01 = CharStreams.toString(
118-
new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
119-
block01Stream.close();
120-
assertEquals(sortBlock0 + sortBlock1, block01);
121114
}
122115

123116
@Test

common/network-yarn/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-4.1.x-r51</version>
25+
<version>2.4.1-kylin-4.1.x-r52</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

0 commit comments

Comments
 (0)