Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Backport apache/incubator-beam#1327 #508

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/sorter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ This module provides the SortValues transform, which takes a `PCollection<KV<K,

##Caveats
* This transform performs value-only sorting; the iterable accompanying each key is sorted, but *there is no relationship between different keys*, as Dataflow does not support any defined relationship between different elements in a PCollection.
* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`.
* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`. A (rough) estimate of the number of bytes of disk space utilized if sorting spills to disk is `numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3`.

##Options
* The user can customize the temporary location used if sorting requires spilling to disk and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options` to pass into `SortValues.create`.
Expand Down
8 changes: 2 additions & 6 deletions contrib/sorter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
<include>org.apache.hadoop:hadoop-mapreduce-client-core</include>
<include>org.apache.hadoop:hadoop-common</include>
<include>com.google.guava:guava</include>
</includes>
</artifactSet>
Expand All @@ -166,10 +164,6 @@
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.hadoop</pattern>
<shadedPattern>com.google.cloud.dataflow.repackaged.org.apache.hadoop</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.google.cloud.dataflow.repackaged.com.google.common</shadedPattern>
Expand Down Expand Up @@ -198,12 +192,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ public String getTempLocation() {

/**
* Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in
* memory sorting and the buffer used when external sorting. Must be greater than zero.
* memory sorting and the buffer used when external sorting. Must be greater than zero and less
* than 2048.
*/
public void setMemoryMB(int memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
// overflow
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
this.memoryMB = memoryMB;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ public String getTempLocation() {
return tempLocation;
}

/** Sets the size of the memory buffer in megabytes. */
/**
* Sets the size of the memory buffer in megabytes. Must be greater than zero and less than
* 2048.
*/
public void setMemoryMB(int memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
// integer overflow
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
this.memoryMB = memoryMB;
}

Expand Down Expand Up @@ -134,6 +140,9 @@ private void initHadoopSorter() throws IOException {
paths = new Path[] {new Path(tempDir, "test.seq")};

JobConf conf = new JobConf();
// Sets directory for intermediate files created during merge of merge sort
conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());

writer =
SequenceFile.createWriter(
conf,
Expand All @@ -143,6 +152,10 @@ private void initHadoopSorter() throws IOException {
Writer.compression(CompressionType.NONE));

FileSystem fs = FileSystem.getLocal(conf);
// Directory has to exist for Hadoop to recognize it as deletable on exit
fs.mkdirs(tempDir);
fs.deleteOnExit(tempDir);

sorter =
new SequenceFile.Sorter(
fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
class InMemorySorter implements Sorter {
/** {@code Options} contains configuration of the sorter. */
public static class Options implements Serializable {
private int memoryMB = 100;
private long memoryMB = 100;

/** Sets the size of the memory buffer in megabytes. */
public void setMemoryMB(int memoryMB) {
public void setMemoryMB(long memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
this.memoryMB = memoryMB;
}

/** Returns the configured size of the memory buffer. */
public int getMemoryMB() {
public long getMemoryMB() {
return memoryMB;
}
}
Expand All @@ -51,7 +51,7 @@ public int getMemoryMB() {
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();

/** How many bytes per word in the running JVM. Assumes 64 bit/8 bytes if unknown. */
private static final int NUM_BYTES_PER_WORD = getNumBytesPerWord();
private static final long NUM_BYTES_PER_WORD = getNumBytesPerWord();

/**
* Estimate of memory overhead per KV record in bytes not including memory associated with keys
Expand All @@ -64,13 +64,13 @@ public int getMemoryMB() {
* <li> Per-object overhead (JVM-specific, guessing 2 words * 3 objects)
* </ul>
*/
private static final int RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
private static final long RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;

/** Maximum size of the buffer in bytes. */
private int maxBufferSize;
private long maxBufferSize;

/** Current number of stored bytes. Including estimated overhead bytes. */
private int numBytes;
private long numBytes;

/** Whether sort has been called. */
private boolean sortCalled;
Expand All @@ -80,7 +80,7 @@ public int getMemoryMB() {

/** Private constructor. */
private InMemorySorter(Options options) {
maxBufferSize = options.getMemoryMB() * 1024 * 1024;
maxBufferSize = options.getMemoryMB() * 1024L * 1024L;
}

/** Create a new sorter from provided options. */
Expand All @@ -97,7 +97,7 @@ public void add(KV<byte[], byte[]> record) {
public boolean addIfRoom(KV<byte[], byte[]> record) {
checkState(!sortCalled, "Records can only be added before sort()");

int recordBytes = estimateRecordBytes(record);
long recordBytes = estimateRecordBytes(record);
if (roomInBuffer(numBytes + recordBytes, records.size() + 1)) {
records.add(record);
numBytes += recordBytes;
Expand Down Expand Up @@ -129,15 +129,15 @@ public int compare(KV<byte[], byte[]> o1, KV<byte[], byte[]> o2) {
* Estimate the number of additional bytes required to store this record. Including the key, the
* value and any overhead for objects and references.
*/
private int estimateRecordBytes(KV<byte[], byte[]> record) {
private long estimateRecordBytes(KV<byte[], byte[]> record) {
return RECORD_MEMORY_OVERHEAD_ESTIMATE + record.getKey().length + record.getValue().length;
}

/**
* Check whether we have room to store the provided total number of bytes and total number of
* records.
*/
private boolean roomInBuffer(int numBytes, int numRecords) {
private boolean roomInBuffer(long numBytes, long numRecords) {
// Collections.sort may allocate up to n/2 extra object references.
// Also, ArrayList grows by a factor of 1.5x, so there might be up to n/2 null object
// references in the backing array.
Expand All @@ -151,11 +151,11 @@ private boolean roomInBuffer(int numBytes, int numRecords) {
* Returns the number of bytes in a word according to the JVM. Defaults to 8 for 64 bit if answer
* unknown.
*/
private static int getNumBytesPerWord() {
private static long getNumBytesPerWord() {
String bitsPerWord = System.getProperty("sun.arch.data.model");

try {
return Integer.parseInt(bitsPerWord) / 8;
return Long.parseLong(bitsPerWord) / 8;
} catch (Exception e) {
// Can't determine whether 32 or 64 bit, so assume 64
return 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,20 @@ public void testNegativeMemory() throws Exception {
BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
options.setMemoryMB(-1);
}

@Test
public void testZeroMemory() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
options.setMemoryMB(0);
}

@Test
public void testMemoryTooLarge() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be less than 2048");
BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
options.setMemoryMB(2048);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,20 @@ public void testNegativeMemory() throws Exception {
ExternalSorter.Options options = new ExternalSorter.Options();
options.setMemoryMB(-1);
}

@Test
public void testZeroMemory() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
ExternalSorter.Options options = new ExternalSorter.Options();
options.setMemoryMB(0);
}

@Test
public void testMemoryTooLarge() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be less than 2048");
ExternalSorter.Options options = new ExternalSorter.Options();
options.setMemoryMB(2048);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,12 @@ public void testNegativeMemory() throws Exception {
InMemorySorter.Options options = new InMemorySorter.Options();
options.setMemoryMB(-1);
}

@Test
public void testZeroMemory() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
InMemorySorter.Options options = new InMemorySorter.Options();
options.setMemoryMB(0);
}
}