Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit defcd01

Browse files
committed
Some minor changes and fixes for sorter module Includes: * Limit max memory for ExternalSorter and BufferedExternalSorter to 2047 MB to prevent int overflow within Hadoop's sorting library * Fix int overflow for large memory values in InMemorySorter * Add note about estimated disk use to README.MD * Fix to make Hadoop's sorting library put all temp files under the specified directory * Have Hadoop clean up the temp directory on exit * Stop shading hadoop dependencies. Some context: ** The existing shading is broken (modules that depend on this one cannot use it successfully). ** Hadoop's use of reflection in several instances makes shading the dependency "in a good way" nearly impossible. It requires a couple of rather brittle hacks, and, for clients that depend on certain conflicting versions of hadoop these hacks can mean it doesn't meet its intended goal of preventing conflicts anyway. ** From what I can tell, there's no good way to shade this to make it universally usable, so leaving it unshaded seems like a reasonable default. ** Without shading Hadoop, this module can be successfully used from Beam's wordcount example (which actually does have pre-existing hadoop dependencies already).
1 parent f6b287f commit defcd01

File tree

8 files changed

+75
-22
lines changed

8 files changed

+75
-22
lines changed

contrib/sorter/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ This module provides the SortValues transform, which takes a `PCollection<KV<K,
33

44
##Caveats
55
* This transform performs value-only sorting; the iterable accompanying each key is sorted, but *there is no relationship between different keys*, as Dataflow does not support any defined relationship between different elements in a PCollection.
6-
* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`.
6+
* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`. A (rough) estimate of the number of bytes of disk space utilized if sorting spills to disk is `numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3`.
77

88
##Options
99
* The user can customize the temporary location used if sorting requires spilling to disk and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options` to pass into `SortValues.create`.

contrib/sorter/pom.xml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@
150150
<shadeTestJar>true</shadeTestJar>
151151
<artifactSet>
152152
<includes>
153-
<include>org.apache.hadoop:hadoop-mapreduce-client-core</include>
154-
<include>org.apache.hadoop:hadoop-common</include>
155153
<include>com.google.guava:guava</include>
156154
</includes>
157155
</artifactSet>
@@ -166,10 +164,6 @@
166164
</filter>
167165
</filters>
168166
<relocations>
169-
<relocation>
170-
<pattern>org.apache.hadoop</pattern>
171-
<shadedPattern>com.google.cloud.dataflow.repackaged.org.apache.hadoop</shadedPattern>
172-
</relocation>
173167
<relocation>
174168
<pattern>com.google.common</pattern>
175169
<shadedPattern>com.google.cloud.dataflow.repackaged.com.google.common</shadedPattern>
@@ -198,12 +192,14 @@
198192
<groupId>org.apache.hadoop</groupId>
199193
<artifactId>hadoop-mapreduce-client-core</artifactId>
200194
<version>${hadoop.version}</version>
195+
<scope>provided</scope>
201196
</dependency>
202197

203198
<dependency>
204199
<groupId>org.apache.hadoop</groupId>
205200
<artifactId>hadoop-common</artifactId>
206201
<version>${hadoop.version}</version>
202+
<scope>provided</scope>
207203
</dependency>
208204

209205
<dependency>

contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,14 @@ public String getTempLocation() {
4848

4949
/**
5050
* Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in
51-
* memory sorting and the buffer used when external sorting. Must be greater than zero.
51+
* memory sorting and the buffer used when external sorting. Must be greater than zero and less
52+
* than 2048.
5253
*/
5354
public void setMemoryMB(int memoryMB) {
5455
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
56+
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
57+
// overflow
58+
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
5559
this.memoryMB = memoryMB;
5660
}
5761

contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,15 @@ public String getTempLocation() {
7979
return tempLocation;
8080
}
8181

82-
/** Sets the size of the memory buffer in megabytes. */
82+
/**
83+
* Sets the size of the memory buffer in megabytes. Must be greater than zero and less than
84+
* 2048.
85+
*/
8386
public void setMemoryMB(int memoryMB) {
8487
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
88+
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
89+
// integer overflow
90+
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
8591
this.memoryMB = memoryMB;
8692
}
8793

@@ -134,6 +140,9 @@ private void initHadoopSorter() throws IOException {
134140
paths = new Path[] {new Path(tempDir, "test.seq")};
135141

136142
JobConf conf = new JobConf();
143+
// Sets directory for intermediate files created during merge of merge sort
144+
conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());
145+
137146
writer =
138147
SequenceFile.createWriter(
139148
conf,
@@ -143,6 +152,10 @@ private void initHadoopSorter() throws IOException {
143152
Writer.compression(CompressionType.NONE));
144153

145154
FileSystem fs = FileSystem.getLocal(conf);
155+
// Directory has to exist for Hadoop to recognize it as deletable on exit
156+
fs.mkdirs(tempDir);
157+
fs.deleteOnExit(tempDir);
158+
146159
sorter =
147160
new SequenceFile.Sorter(
148161
fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);

contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorter.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,16 @@
3333
class InMemorySorter implements Sorter {
3434
/** {@code Options} contains configuration of the sorter. */
3535
public static class Options implements Serializable {
36-
private int memoryMB = 100;
36+
private long memoryMB = 100;
3737

3838
/** Sets the size of the memory buffer in megabytes. */
39-
public void setMemoryMB(int memoryMB) {
39+
public void setMemoryMB(long memoryMB) {
4040
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
4141
this.memoryMB = memoryMB;
4242
}
4343

4444
/** Returns the configured size of the memory buffer. */
45-
public int getMemoryMB() {
45+
public long getMemoryMB() {
4646
return memoryMB;
4747
}
4848
}
@@ -51,7 +51,7 @@ public int getMemoryMB() {
5151
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
5252

5353
/** How many bytes per word in the running JVM. Assumes 64 bit/8 bytes if unknown. */
54-
private static final int NUM_BYTES_PER_WORD = getNumBytesPerWord();
54+
private static final long NUM_BYTES_PER_WORD = getNumBytesPerWord();
5555

5656
/**
5757
* Estimate of memory overhead per KV record in bytes not including memory associated with keys
@@ -64,13 +64,13 @@ public int getMemoryMB() {
6464
* <li> Per-object overhead (JVM-specific, guessing 2 words * 3 objects)
6565
* </ul>
6666
*/
67-
private static final int RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
67+
private static final long RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
6868

6969
/** Maximum size of the buffer in bytes. */
70-
private int maxBufferSize;
70+
private long maxBufferSize;
7171

7272
/** Current number of stored bytes. Including estimated overhead bytes. */
73-
private int numBytes;
73+
private long numBytes;
7474

7575
/** Whether sort has been called. */
7676
private boolean sortCalled;
@@ -80,7 +80,7 @@ public int getMemoryMB() {
8080

8181
/** Private constructor. */
8282
private InMemorySorter(Options options) {
83-
maxBufferSize = options.getMemoryMB() * 1024 * 1024;
83+
maxBufferSize = options.getMemoryMB() * 1024L * 1024L;
8484
}
8585

8686
/** Create a new sorter from provided options. */
@@ -97,7 +97,7 @@ public void add(KV<byte[], byte[]> record) {
9797
public boolean addIfRoom(KV<byte[], byte[]> record) {
9898
checkState(!sortCalled, "Records can only be added before sort()");
9999

100-
int recordBytes = estimateRecordBytes(record);
100+
long recordBytes = estimateRecordBytes(record);
101101
if (roomInBuffer(numBytes + recordBytes, records.size() + 1)) {
102102
records.add(record);
103103
numBytes += recordBytes;
@@ -129,15 +129,15 @@ public int compare(KV<byte[], byte[]> o1, KV<byte[], byte[]> o2) {
129129
* Estimate the number of additional bytes required to store this record. Including the key, the
130130
* value and any overhead for objects and references.
131131
*/
132-
private int estimateRecordBytes(KV<byte[], byte[]> record) {
132+
private long estimateRecordBytes(KV<byte[], byte[]> record) {
133133
return RECORD_MEMORY_OVERHEAD_ESTIMATE + record.getKey().length + record.getValue().length;
134134
}
135135

136136
/**
137137
* Check whether we have room to store the provided total number of bytes and total number of
138138
* records.
139139
*/
140-
private boolean roomInBuffer(int numBytes, int numRecords) {
140+
private boolean roomInBuffer(long numBytes, long numRecords) {
141141
// Collections.sort may allocate up to n/2 extra object references.
142142
// Also, ArrayList grows by a factor of 1.5x, so there might be up to n/2 null object
143143
// references in the backing array.
@@ -151,11 +151,11 @@ private boolean roomInBuffer(int numBytes, int numRecords) {
151151
* Returns the number of bytes in a word according to the JVM. Defaults to 8 for 64 bit if answer
152152
* unknown.
153153
*/
154-
private static int getNumBytesPerWord() {
154+
private static long getNumBytesPerWord() {
155155
String bitsPerWord = System.getProperty("sun.arch.data.model");
156156

157157
try {
158-
return Integer.parseInt(bitsPerWord) / 8;
158+
return Long.parseLong(bitsPerWord) / 8;
159159
} catch (Exception e) {
160160
// Can't determine whether 32 or 64 bit, so assume 64
161161
return 8;

contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorterTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,20 @@ public void testNegativeMemory() throws Exception {
173173
BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
174174
options.setMemoryMB(-1);
175175
}
176+
177+
@Test
178+
public void testZeroMemory() throws Exception {
179+
thrown.expect(IllegalArgumentException.class);
180+
thrown.expectMessage("memoryMB must be greater than zero");
181+
BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
182+
options.setMemoryMB(0);
183+
}
184+
185+
@Test
186+
public void testMemoryTooLarge() throws Exception {
187+
thrown.expect(IllegalArgumentException.class);
188+
thrown.expectMessage("memoryMB must be less than 2048");
189+
BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
190+
options.setMemoryMB(2048);
191+
}
176192
}

contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorterTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,20 @@ public void testNegativeMemory() throws Exception {
8282
ExternalSorter.Options options = new ExternalSorter.Options();
8383
options.setMemoryMB(-1);
8484
}
85+
86+
@Test
87+
public void testZeroMemory() throws Exception {
88+
thrown.expect(IllegalArgumentException.class);
89+
thrown.expectMessage("memoryMB must be greater than zero");
90+
ExternalSorter.Options options = new ExternalSorter.Options();
91+
options.setMemoryMB(0);
92+
}
93+
94+
@Test
95+
public void testMemoryTooLarge() throws Exception {
96+
thrown.expect(IllegalArgumentException.class);
97+
thrown.expectMessage("memoryMB must be less than 2048");
98+
ExternalSorter.Options options = new ExternalSorter.Options();
99+
options.setMemoryMB(2048);
100+
}
85101
}

contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorterTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,12 @@ public void testNegativeMemory() throws Exception {
139139
InMemorySorter.Options options = new InMemorySorter.Options();
140140
options.setMemoryMB(-1);
141141
}
142+
143+
@Test
144+
public void testZeroMemory() throws Exception {
145+
thrown.expect(IllegalArgumentException.class);
146+
thrown.expectMessage("memoryMB must be greater than zero");
147+
InMemorySorter.Options options = new InMemorySorter.Options();
148+
options.setMemoryMB(0);
149+
}
142150
}

0 commit comments

Comments
 (0)