Skip to content

Commit 7f36cd2

Browse files
mccheahMarcelo Vanzin
authored andcommitted
[SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API
## What changes were proposed in this pull request? Uses the APIs introduced in SPARK-28209 in the UnsafeShuffleWriter. ## How was this patch tested? Since this is just a refactor, existing unit tests should cover the relevant code paths. Micro-benchmarks from the original fork where this code was built show no degradation in performance. Closes #25304 from mccheah/shuffle-writer-refactor-unsafe-writer. Lead-authored-by: mcheah <mcheah@palantir.com> Co-authored-by: mccheah <mcheah@palantir.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent fa75db2 commit 7f36cd2

File tree

9 files changed

+374
-199
lines changed

9 files changed

+374
-199
lines changed

core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.shuffle.api;
1919

2020
import java.io.IOException;
21+
import java.util.Optional;
2122

2223
import org.apache.spark.annotation.Private;
2324

@@ -39,17 +40,39 @@ public interface ShuffleExecutorComponents {
3940
/**
4041
* Called once per map task to create a writer that will be responsible for persisting all the
4142
* partitioned bytes written by that map task.
42-
* @param shuffleId Unique identifier for the shuffle the map task is a part of
43+
*
44+
* @param shuffleId Unique identifier for the shuffle the map task is a part of
4345
* @param mapId Within the shuffle, the identifier of the map task
4446
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
45-
* with the same (shuffleId, mapId) pair can be distinguished by the
46-
* different values of mapTaskAttemptId.
47+
* with the same (shuffleId, mapId) pair can be distinguished by the
48+
* different values of mapTaskAttemptId.
4749
* @param numPartitions The number of partitions that will be written by the map task. Some of
48-
* these partitions may be empty.
50+
* these partitions may be empty.
4951
*/
5052
ShuffleMapOutputWriter createMapOutputWriter(
5153
int shuffleId,
5254
int mapId,
5355
long mapTaskAttemptId,
5456
int numPartitions) throws IOException;
57+
58+
/**
59+
* An optional extension for creating a map output writer that can optimize the transfer of a
60+
* single partition file, as the entire result of a map task, to the backing store.
61+
* <p>
62+
* Most implementations should return the default {@link Optional#empty()} to indicate that
63+
* they do not support this optimization. This primarily is for backwards-compatibility in
64+
* preserving an optimization in the local disk shuffle storage implementation.
65+
*
66+
* @param shuffleId Unique identifier for the shuffle the map task is a part of
67+
* @param mapId Within the shuffle, the identifier of the map task
68+
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
69+
* with the same (shuffleId, mapId) pair can be distinguished by the
70+
* different values of mapTaskAttemptId.
71+
*/
72+
default Optional<SingleSpillShuffleMapOutputWriter> createSingleFileMapOutputWriter(
73+
int shuffleId,
74+
int mapId,
75+
long mapTaskAttemptId) throws IOException {
76+
return Optional.empty();
77+
}
5578
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.File;
21+
import java.io.IOException;
22+
23+
import org.apache.spark.annotation.Private;
24+
25+
/**
26+
* Optional extension for partition writing that is optimized for transferring a single
27+
* file to the backing store.
28+
*/
29+
@Private
30+
public interface SingleSpillShuffleMapOutputWriter {
31+
32+
/**
33+
* Transfer a file that contains the bytes of all the partitions written by this map task.
34+
*/
35+
void transferMapSpillFile(File mapOutputFile, long[] partitionLengths) throws IOException;
36+
}

0 commit comments

Comments
 (0)