Skip to content

Commit cf368e5

Browse files
authored
revert ae (apache#140)
1 parent ec20412 commit cf368e5

File tree

69 files changed

+767
-3792
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+767
-3792
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -197,60 +197,48 @@ public Map<String, Metric> getMetrics() {
197197
}
198198
}
199199

200-
private boolean isShuffleBlock(String[] blockIdParts) {
201-
// length == 4: ShuffleBlockId
202-
// length == 5: ContinuousShuffleBlockId
203-
return (blockIdParts.length == 4 || blockIdParts.length == 5) &&
204-
blockIdParts[0].equals("shuffle");
205-
}
206-
207200
private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
208201

209202
private int index = 0;
210203
private final String appId;
211204
private final String execId;
212205
private final int shuffleId;
213-
// An array containing mapId, reduceId and numBlocks tuple
214-
private final int[] shuffleBlockIds;
206+
// An array containing mapId and reduceId pairs.
207+
private final int[] mapIdAndReduceIds;
215208

216209
ManagedBufferIterator(String appId, String execId, String[] blockIds) {
217210
this.appId = appId;
218211
this.execId = execId;
219212
String[] blockId0Parts = blockIds[0].split("_");
220-
if (!isShuffleBlock(blockId0Parts)) {
213+
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
221214
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
222215
}
223216
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
224-
shuffleBlockIds = new int[3 * blockIds.length];
217+
mapIdAndReduceIds = new int[2 * blockIds.length];
225218
for (int i = 0; i < blockIds.length; i++) {
226219
String[] blockIdParts = blockIds[i].split("_");
227-
if (!isShuffleBlock(blockIdParts)) {
220+
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
228221
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
229222
}
230223
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
231224
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
232-
", got:" + blockIds[i]);
233-
}
234-
shuffleBlockIds[3 * i] = Integer.parseInt(blockIdParts[2]);
235-
shuffleBlockIds[3 * i + 1] = Integer.parseInt(blockIdParts[3]);
236-
if (blockIdParts.length == 4) {
237-
shuffleBlockIds[3 * i + 2] = 1;
238-
} else {
239-
shuffleBlockIds[3 * i + 2] = Integer.parseInt(blockIdParts[4]);
225+
", got:" + blockIds[i]);
240226
}
227+
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
228+
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
241229
}
242230
}
243231

244232
@Override
245233
public boolean hasNext() {
246-
return index < shuffleBlockIds.length;
234+
return index < mapIdAndReduceIds.length;
247235
}
248236

249237
@Override
250238
public ManagedBuffer next() {
251239
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
252-
shuffleBlockIds[index], shuffleBlockIds[index + 1], shuffleBlockIds[index + 2]);
253-
index += 3;
240+
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
241+
index += 2;
254242
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
255243
return block;
256244
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,21 @@ public void registerExecutor(
162162
}
163163

164164
/**
165-
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions
165+
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
166166
* about how the hash and sort based shuffles store their data.
167167
*/
168168
public ManagedBuffer getBlockData(
169-
String appId,
170-
String execId,
171-
int shuffleId,
172-
int mapId,
173-
int reduceId,
174-
int numBlocks) {
169+
String appId,
170+
String execId,
171+
int shuffleId,
172+
int mapId,
173+
int reduceId) {
175174
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
176175
if (executor == null) {
177176
throw new RuntimeException(
178-
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
177+
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
179178
}
180-
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks);
179+
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
181180
}
182181

183182
/**
@@ -281,19 +280,19 @@ public boolean accept(File dir, String name) {
281280
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
282281
*/
283282
private ManagedBuffer getSortBasedShuffleBlockData(
284-
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) {
283+
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
285284
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
286-
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
285+
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
287286

288287
try {
289288
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
290-
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId, numBlocks);
289+
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
291290
return new FileSegmentManagedBuffer(
292-
conf,
293-
getFile(executor.localDirs, executor.subDirsPerLocalDir,
294-
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
295-
shuffleIndexRecord.getOffset(),
296-
shuffleIndexRecord.getLength());
291+
conf,
292+
getFile(executor.localDirs, executor.subDirsPerLocalDir,
293+
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
294+
shuffleIndexRecord.getOffset(),
295+
shuffleIndexRecord.getLength());
297296
} catch (ExecutionException e) {
298297
throw new RuntimeException("Failed to open file: " + indexFile, e);
299298
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public int getSize() {
5959
/**
6060
* Get index offset for a particular reducer.
6161
*/
62-
public ShuffleIndexRecord getIndex(int reduceId, int numBlocks) {
62+
public ShuffleIndexRecord getIndex(int reduceId) {
6363
long offset = offsets.get(reduceId);
64-
long nextOffset = offsets.get(reduceId + numBlocks);
64+
long nextOffset = offsets.get(reduceId + 1);
6565
return new ShuffleIndexRecord(offset, nextOffset - offset);
6666
}
6767
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() {
8383

8484
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
8585
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
86-
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, 1)).thenReturn(block0Marker);
87-
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, 1)).thenReturn(block1Marker);
86+
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
87+
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
8888
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
8989
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
9090
.toByteBuffer();
@@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() {
106106
assertEquals(block0Marker, buffers.next());
107107
assertEquals(block1Marker, buffers.next());
108108
assertFalse(buffers.hasNext());
109-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, 1);
110-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, 1);
109+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
110+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
111111

112112
// Verify open block request latency metrics
113113
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void testBadRequests() throws IOException {
6666
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
6767
// Unregistered executor
6868
try {
69-
resolver.getBlockData("app0", "exec1", 1, 1, 0, 1);
69+
resolver.getBlockData("app0", "exec1", 1, 1, 0);
7070
fail("Should have failed");
7171
} catch (RuntimeException e) {
7272
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
@@ -75,7 +75,7 @@ public void testBadRequests() throws IOException {
7575
// Invalid shuffle manager
7676
try {
7777
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
78-
resolver.getBlockData("app0", "exec2", 1, 1, 0, 1);
78+
resolver.getBlockData("app0", "exec2", 1, 1, 0);
7979
fail("Should have failed");
8080
} catch (UnsupportedOperationException e) {
8181
// pass
@@ -85,7 +85,7 @@ public void testBadRequests() throws IOException {
8585
resolver.registerExecutor("app0", "exec3",
8686
dataContext.createExecutorInfo(SORT_MANAGER));
8787
try {
88-
resolver.getBlockData("app0", "exec3", 1, 1, 0, 1);
88+
resolver.getBlockData("app0", "exec3", 1, 1, 0);
8989
fail("Should have failed");
9090
} catch (Exception e) {
9191
// pass
@@ -99,25 +99,18 @@ public void testSortShuffleBlocks() throws IOException {
9999
dataContext.createExecutorInfo(SORT_MANAGER));
100100

101101
InputStream block0Stream =
102-
resolver.getBlockData("app0", "exec0", 0, 0, 0, 1).createInputStream();
102+
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
103103
String block0 = CharStreams.toString(
104104
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
105105
block0Stream.close();
106106
assertEquals(sortBlock0, block0);
107107

108108
InputStream block1Stream =
109-
resolver.getBlockData("app0", "exec0", 0, 0, 1, 1).createInputStream();
109+
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
110110
String block1 = CharStreams.toString(
111111
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
112112
block1Stream.close();
113113
assertEquals(sortBlock1, block1);
114-
115-
InputStream block01Stream =
116-
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream();
117-
String block01 = CharStreams.toString(
118-
new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
119-
block01Stream.close();
120-
assertEquals(sortBlock0 + sortBlock1, block01);
121114
}
122115

123116
@Test

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
125125
if (!records.hasNext()) {
126126
partitionLengths = new long[numPartitions];
127127
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
128-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, new long[numPartitions]);
128+
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
129129
return;
130130
}
131131
final SerializerInstance serInstance = serializer.newInstance();
@@ -159,18 +159,15 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
159159

160160
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
161161
File tmp = Utils.tempFileWith(output);
162-
MapInfo mapInfo;
163162
try {
164-
mapInfo = writePartitionedFile(tmp);
165-
partitionLengths = mapInfo.lengths;
163+
partitionLengths = writePartitionedFile(tmp);
166164
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
167165
} finally {
168166
if (tmp.exists() && !tmp.delete()) {
169167
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
170168
}
171169
}
172-
mapStatus = MapStatus$.MODULE$.apply(
173-
blockManager.shuffleServerId(), mapInfo.lengths, mapInfo.records);
170+
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
174171
}
175172

176173
@VisibleForTesting
@@ -183,13 +180,12 @@ long[] getPartitionLengths() {
183180
*
184181
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
185182
*/
186-
private MapInfo writePartitionedFile(File outputFile) throws IOException {
183+
private long[] writePartitionedFile(File outputFile) throws IOException {
187184
// Track location of the partition starts in the output file
188185
final long[] lengths = new long[numPartitions];
189-
final long[] records = new long[numPartitions];
190186
if (partitionWriters == null) {
191187
// We were passed an empty iterator
192-
return new MapInfo(lengths, records);
188+
return lengths;
193189
}
194190

195191
final FileOutputStream out = new FileOutputStream(outputFile, true);
@@ -198,7 +194,6 @@ private MapInfo writePartitionedFile(File outputFile) throws IOException {
198194
try {
199195
for (int i = 0; i < numPartitions; i++) {
200196
final File file = partitionWriterSegments[i].file();
201-
records[i] = partitionWriterSegments[i].record();
202197
if (file.exists()) {
203198
final FileInputStream in = new FileInputStream(file);
204199
boolean copyThrewException = true;
@@ -219,7 +214,7 @@ private MapInfo writePartitionedFile(File outputFile) throws IOException {
219214
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
220215
}
221216
partitionWriters = null;
222-
return new MapInfo(lengths, records);
217+
return lengths;
223218
}
224219

225220
@Override

core/src/main/java/org/apache/spark/shuffle/sort/MapInfo.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ private void writeSortedFile(boolean isLastFile) {
195195
if (currentPartition != -1) {
196196
final FileSegment fileSegment = writer.commitAndGet();
197197
spillInfo.partitionLengths[currentPartition] = fileSegment.length();
198-
spillInfo.partitionRecords[currentPartition] = fileSegment.record();
199198
}
200199
currentPartition = partition;
201200
}
@@ -223,7 +222,6 @@ private void writeSortedFile(boolean isLastFile) {
223222
// writeSortedFile() in that case.
224223
if (currentPartition != -1) {
225224
spillInfo.partitionLengths[currentPartition] = committedSegment.length();
226-
spillInfo.partitionRecords[currentPartition] = committedSegment.record();
227225
spills.add(spillInfo);
228226
}
229227

core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,11 @@
2626
*/
2727
final class SpillInfo {
2828
final long[] partitionLengths;
29-
final long[] partitionRecords;
3029
final File file;
3130
final TempShuffleBlockId blockId;
3231

3332
SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
3433
this.partitionLengths = new long[numPartitions];
35-
this.partitionRecords = new long[numPartitions];
3634
this.file = file;
3735
this.blockId = blockId;
3836
}

0 commit comments

Comments
 (0)