Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#180 from lukecwik/backport-114
Browse files Browse the repository at this point in the history
Backport of apache/beam#114
  • Loading branch information
lukecwik committed Apr 4, 2016
2 parents f595b46 + b2b4a2f commit 5ed12d9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe
byte zero = 0x00;
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
if (header == GZIPInputStream.GZIP_MAGIC) {
return Channels.newChannel(new GzipCompressorInputStream(stream));
return Channels.newChannel(new GzipCompressorInputStream(stream, true));
}
}
return Channels.newChannel(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.zip.GZIPOutputStream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -97,6 +100,49 @@ public void testEmptyReadGzip() throws Exception {
runReadTest(input, CompressionMode.GZIP);
}

private static byte[] compressGzip(byte[] input) throws IOException {
ByteArrayOutputStream res = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
gzipStream.write(input);
}
return res.toByteArray();
}

private static byte[] concat(byte[] first, byte[] second) {
byte[] res = new byte[first.length + second.length];
System.arraycopy(first, 0, res, 0, first.length);
System.arraycopy(second, 0, res, first.length, second.length);
return res;
}

/**
* Test a concatenation of gzip files is correctly decompressed.
*
* <p>A concatenation of gzip files as one file is a valid gzip file and should decompress
* to be the concatenation of those individual files.
*/
@Test
public void testReadConcatenatedGzip() throws IOException {
byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
byte[] expected = concat(header, body);
byte[] totalGz = concat(compressGzip(header), compressGzip(body));
File tmpFile = tmpFolder.newFile();
try (FileOutputStream os = new FileOutputStream(tmpFile)) {
os.write(totalGz);
}

Pipeline p = TestPipeline.create();

CompressedSource<Byte> source =
CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
.withDecompression(CompressionMode.GZIP);
PCollection<Byte> output = p.apply(Read.from(source));

DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
p.run();
}

/**
* Test reading empty input with bzip2.
*/
Expand Down

0 comments on commit 5ed12d9

Please sign in to comment.