Skip to content

Commit c53ebea

Browse files
JoshRosenrxin
authored andcommitted
[SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort
This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter 4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests. 9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter (cherry picked from commit 73bed40) Signed-off-by: Reynold Xin <rxin@databricks.com>
1 parent 6c0644a commit c53ebea

33 files changed

+2767
-64
lines changed

core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,16 @@
361361
<artifactId>junit</artifactId>
362362
<scope>test</scope>
363363
</dependency>
364+
<dependency>
365+
<groupId>org.hamcrest</groupId>
366+
<artifactId>hamcrest-core</artifactId>
367+
<scope>test</scope>
368+
</dependency>
369+
<dependency>
370+
<groupId>org.hamcrest</groupId>
371+
<artifactId>hamcrest-library</artifactId>
372+
<scope>test</scope>
373+
</dependency>
364374
<dependency>
365375
<groupId>com.novocode</groupId>
366376
<artifactId>junit-interface</artifactId>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
import java.nio.ByteBuffer;
24+
25+
import scala.reflect.ClassTag;
26+
27+
import org.apache.spark.serializer.DeserializationStream;
28+
import org.apache.spark.serializer.SerializationStream;
29+
import org.apache.spark.serializer.SerializerInstance;
30+
import org.apache.spark.unsafe.PlatformDependent;
31+
32+
/**
33+
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
34+
* Our shuffle write path doesn't actually use this serializer (since we end up calling the
35+
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
36+
* around this, we pass a dummy no-op serializer.
37+
*/
38+
final class DummySerializerInstance extends SerializerInstance {
39+
40+
public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();
41+
42+
private DummySerializerInstance() { }
43+
44+
@Override
45+
public SerializationStream serializeStream(final OutputStream s) {
46+
return new SerializationStream() {
47+
@Override
48+
public void flush() {
49+
// Need to implement this because DiskObjectWriter uses it to flush the compression stream
50+
try {
51+
s.flush();
52+
} catch (IOException e) {
53+
PlatformDependent.throwException(e);
54+
}
55+
}
56+
57+
@Override
58+
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
59+
throw new UnsupportedOperationException();
60+
}
61+
62+
@Override
63+
public void close() {
64+
// Need to implement this because DiskObjectWriter uses it to close the compression stream
65+
try {
66+
s.close();
67+
} catch (IOException e) {
68+
PlatformDependent.throwException(e);
69+
}
70+
}
71+
};
72+
}
73+
74+
@Override
75+
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
76+
throw new UnsupportedOperationException();
77+
}
78+
79+
@Override
80+
public DeserializationStream deserializeStream(InputStream s) {
81+
throw new UnsupportedOperationException();
82+
}
83+
84+
@Override
85+
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
86+
throw new UnsupportedOperationException();
87+
}
88+
89+
@Override
90+
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
91+
throw new UnsupportedOperationException();
92+
}
93+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
/**
21+
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
22+
* <p>
23+
* Within the long, the data is laid out as follows:
24+
* <pre>
25+
* [24 bit partition number][13 bit memory page number][27 bit offset in page]
26+
* </pre>
27+
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
28+
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
29+
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
30+
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
31+
* <p>
32+
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
33+
* optimization to future work as it will require more careful design to ensure that addresses are
34+
* properly aligned (e.g. by padding records).
35+
*/
36+
final class PackedRecordPointer {
37+
38+
static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes
39+
40+
/**
41+
* The maximum partition identifier that can be encoded. Note that partition ids start from 0.
42+
*/
43+
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215
44+
45+
/** Bit mask for the lower 40 bits of a long. */
46+
private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
47+
48+
/** Bit mask for the upper 24 bits of a long */
49+
private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
50+
51+
/** Bit mask for the lower 27 bits of a long. */
52+
private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
53+
54+
/** Bit mask for the lower 51 bits of a long. */
55+
private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
56+
57+
/** Bit mask for the upper 13 bits of a long */
58+
private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
59+
60+
/**
61+
* Pack a record address and partition id into a single word.
62+
*
63+
* @param recordPointer a record pointer encoded by TaskMemoryManager.
64+
* @param partitionId a shuffle partition id (maximum value of 2^24).
65+
* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
66+
*/
67+
public static long packPointer(long recordPointer, int partitionId) {
68+
assert (partitionId <= MAXIMUM_PARTITION_ID);
69+
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
70+
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
71+
final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
72+
final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
73+
return (((long) partitionId) << 40) | compressedAddress;
74+
}
75+
76+
private long packedRecordPointer;
77+
78+
public void set(long packedRecordPointer) {
79+
this.packedRecordPointer = packedRecordPointer;
80+
}
81+
82+
public int getPartitionId() {
83+
return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
84+
}
85+
86+
public long getRecordPointer() {
87+
final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
88+
final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
89+
return pageNumber | offsetInPage;
90+
}
91+
92+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
import java.io.File;
21+
22+
import org.apache.spark.storage.TempShuffleBlockId;
23+
24+
/**
25+
* Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}.
26+
*/
27+
final class SpillInfo {
28+
final long[] partitionLengths;
29+
final File file;
30+
final TempShuffleBlockId blockId;
31+
32+
public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
33+
this.partitionLengths = new long[numPartitions];
34+
this.file = file;
35+
this.blockId = blockId;
36+
}
37+
}

0 commit comments

Comments
 (0)