-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-16109. Parquet reading S3AFileSystem causes EOF #539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,31 +18,306 @@ | |
|
|
||
| package org.apache.hadoop.fs.contract.s3a; | ||
|
|
||
| import java.io.IOException; | ||
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
|
|
||
| import org.junit.Test; | ||
| import org.junit.runner.RunWith; | ||
| import org.junit.runners.Parameterized; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FSDataInputStream; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.contract.AbstractContractSeekTest; | ||
| import org.apache.hadoop.fs.contract.AbstractFSContract; | ||
| import org.apache.hadoop.fs.contract.ContractTestUtils; | ||
| import org.apache.hadoop.fs.s3a.S3AFileSystem; | ||
| import org.apache.hadoop.fs.s3a.S3AInputPolicy; | ||
| import org.apache.hadoop.fs.s3a.S3ATestUtils; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkNotNull; | ||
| import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; | ||
| import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL; | ||
| import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM; | ||
| import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL; | ||
| import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; | ||
|
|
||
| /** | ||
| * S3A contract tests covering file seek. | ||
| */ | ||
| @RunWith(Parameterized.class) | ||
| public class ITestS3AContractSeek extends AbstractContractSeekTest { | ||
|
|
||
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(ITestS3AContractSeek.class); | ||
|
|
||
| protected static final int READAHEAD = 1024; | ||
|
|
||
| private final String seekPolicy; | ||
|
|
||
| public static final int DATASET_LEN = READAHEAD * 2; | ||
|
|
||
| public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); | ||
|
|
||
| /** | ||
| * This test suite is parameterized for the different seek policies | ||
| * which S3A Supports. | ||
| * @return a list of seek policies to test. | ||
| */ | ||
| @Parameterized.Parameters | ||
| public static Collection<Object[]> params() { | ||
| return Arrays.asList(new Object[][]{ | ||
| {INPUT_FADV_RANDOM}, | ||
| {INPUT_FADV_NORMAL}, | ||
| {INPUT_FADV_SEQUENTIAL}, | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Run the test with a chosen seek policy. | ||
| * @param seekPolicy fadvise policy to use. | ||
| */ | ||
| public ITestS3AContractSeek(final String seekPolicy) { | ||
| this.seekPolicy = seekPolicy; | ||
| } | ||
|
|
||
| /** | ||
| * Create a configuration, possibly patching in S3Guard options. | ||
| * The FS is set to be uncached and the readahead and seek policies | ||
| * of the bucket itself are removed, so as to guarantee that the | ||
| * parameterized and test settings are | ||
|
||
| * @return a configuration | ||
| */ | ||
| @Override | ||
| protected Configuration createConfiguration() { | ||
| Configuration conf = super.createConfiguration(); | ||
| // patch in S3Guard options | ||
| maybeEnableS3Guard(conf); | ||
| // purge any per-bucket overrides. | ||
| try { | ||
| URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a"))); | ||
| S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf, | ||
| READAHEAD_RANGE, | ||
| INPUT_FADVISE); | ||
| } catch (URISyntaxException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| // the FS is uncached, so will need clearing in test teardowns. | ||
| S3ATestUtils.disableFilesystemCaching(conf); | ||
| conf.setInt(READAHEAD_RANGE, READAHEAD); | ||
| conf.set(INPUT_FADVISE, seekPolicy); | ||
| return conf; | ||
| } | ||
|
|
||
| @Override | ||
| protected AbstractFSContract createContract(Configuration conf) { | ||
| return new S3AContract(conf); | ||
| } | ||
|
|
||
| @Override | ||
| public void teardown() throws Exception { | ||
| S3AFileSystem fs = getFileSystem(); | ||
| if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) { | ||
| fs.close(); | ||
| } | ||
| super.teardown(); | ||
| } | ||
|
|
||
| /** | ||
| * This subclass of the {@code path(path)} operation adds the seek policy | ||
| * to the end to guarantee uniqueness across different calls of the same | ||
| * method. | ||
| * | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| protected Path path(final String filepath) throws IOException { | ||
| return super.path(filepath + "-" + seekPolicy); | ||
| } | ||
|
|
||
| /** | ||
| * Go to end, read then seek back to the previous position to force normal | ||
| * seek policy to switch to random IO. | ||
| * This will call readByte to trigger the second GET | ||
| * @param in input stream | ||
| * @return the byte read | ||
| * @throws IOException failure. | ||
| */ | ||
| private byte readAtEndAndReturn(final FSDataInputStream in) | ||
| throws IOException { | ||
| long pos = in.getPos(); | ||
| in.seek(DATASET_LEN -1); | ||
| in.readByte(); | ||
| // go back to start and force a new GET | ||
| in.seek(pos); | ||
| return in.readByte(); | ||
| } | ||
|
|
||
| /** | ||
| * Assert that the data read matches the dataset at the given offset. | ||
| * This helps verify that the seek process is moving the read pointer | ||
| * to the correct location in the file. | ||
| * @param readOffset the offset in the file where the read began. | ||
| * @param operation operation name for the assertion. | ||
| * @param data data read in. | ||
| * @param length length of data to check. | ||
| */ | ||
| private void assertDatasetEquals( | ||
| final int readOffset, final String operation, | ||
| final byte[] data, | ||
| int length) { | ||
| for (int i = 0; i < length; i++) { | ||
| int o = readOffset + i; | ||
| assertEquals(operation + " with seek policy " + seekPolicy | ||
| + "and read offset " + readOffset | ||
| + ": data[" + i + "] != DATASET[" + o + "]", | ||
| DATASET[o], data[i]); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public S3AFileSystem getFileSystem() { | ||
| return (S3AFileSystem) super.getFileSystem(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReadPolicyInFS() throws Throwable { | ||
| describe("Verify the read policy is being consistently set"); | ||
| S3AFileSystem fs = getFileSystem(); | ||
| assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy()); | ||
| } | ||
|
|
||
| /** | ||
| * Test for HADOOP-16109: Parquet reading S3AFileSystem causes EOF. | ||
| * This sets up a read which will span the active readahead and, | ||
| * in random IO mode, a subsequent GET. | ||
| */ | ||
| @Test | ||
| public void testReadAcrossReadahead() throws Throwable { | ||
| describe("Sets up a read which will span the active readahead" | ||
| + " and the rest of the file."); | ||
| Path path = path("testReadAcrossReadahead"); | ||
| writeTestDataset(path); | ||
| FileSystem fs = getFileSystem(); | ||
| // forward seek reading across readahead boundary | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| final byte[] temp = new byte[5]; | ||
| in.readByte(); | ||
| int offset = READAHEAD - 1; | ||
| in.readFully(offset, temp); // <-- works | ||
| assertDatasetEquals(offset, "read spanning boundary", temp, temp.length); | ||
| } | ||
| // Read exactly on the the boundary | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| final byte[] temp = new byte[5]; | ||
| readAtEndAndReturn(in); | ||
| assertEquals("current position", 1, (int)(in.getPos())); | ||
| in.readFully(READAHEAD, temp); | ||
| assertDatasetEquals(READAHEAD, "read exactly on boundary", | ||
| temp, temp.length); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Read across the end of the read buffer using the readByte call, | ||
| * which will read a single byte only. | ||
| */ | ||
| @Test | ||
| public void testReadSingleByteAcrossReadahead() throws Throwable { | ||
| describe("Read over boundary using read()/readByte() calls."); | ||
| Path path = path("testReadSingleByteAcrossReadahead"); | ||
| writeTestDataset(path); | ||
| FileSystem fs = getFileSystem(); | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| final byte[] b0 = new byte[1]; | ||
| readAtEndAndReturn(in); | ||
| in.seek(READAHEAD - 1); | ||
| b0[0] = in.readByte(); | ||
| assertDatasetEquals(READAHEAD - 1, "read before end of boundary", b0, | ||
| b0.length); | ||
| b0[0] = in.readByte(); | ||
| assertDatasetEquals(READAHEAD, "read at end of boundary", b0, b0.length); | ||
| b0[0] = in.readByte(); | ||
| assertDatasetEquals(READAHEAD + 1, "read after end of boundary", b0, | ||
| b0.length); | ||
| } | ||
|
||
| } | ||
|
|
||
| @Test | ||
| public void testSeekToReadaheadAndRead() throws Throwable { | ||
| describe("Seek to just before readahead limit and call" | ||
| + " InputStream.read(byte[])"); | ||
| Path path = path("testSeekToReadaheadAndRead"); | ||
| FileSystem fs = getFileSystem(); | ||
| writeTestDataset(path); | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| readAtEndAndReturn(in); | ||
| final byte[] temp = new byte[5]; | ||
| int offset = READAHEAD - 1; | ||
| in.seek(offset); | ||
| // expect to read at least one byte. | ||
| int l = in.read(temp); | ||
| assertTrue("Reading in temp data", l > 0); | ||
| LOG.info("Read of byte array at offset {} returned {} bytes", offset, l); | ||
| assertDatasetEquals(offset, "read at end of boundary", temp, l); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testSeekToReadaheadExactlyAndRead() throws Throwable { | ||
| describe("Seek to exactly the readahead limit and call" | ||
| + " InputStream.read(byte[])"); | ||
| Path path = path("testSeekToReadaheadExactlyAndRead"); | ||
| FileSystem fs = getFileSystem(); | ||
| writeTestDataset(path); | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| readAtEndAndReturn(in); | ||
| final byte[] temp = new byte[5]; | ||
| int offset = READAHEAD; | ||
| in.seek(offset); | ||
| // expect to read at least one byte. | ||
| int l = in.read(temp); | ||
| LOG.info("Read of byte array at offset {} returned {} bytes", offset, l); | ||
| assertTrue("Reading in temp data", l > 0); | ||
| assertDatasetEquals(offset, "read at end of boundary", temp, l); | ||
| } | ||
|
||
| } | ||
|
|
||
| @Test | ||
| public void testSeekToReadaheadExactlyAndReadByte() throws Throwable { | ||
| describe("Seek to exactly the readahead limit and call" | ||
| + " readByte()"); | ||
| Path path = path("testSeekToReadaheadExactlyAndReadByte"); | ||
| FileSystem fs = getFileSystem(); | ||
| writeTestDataset(path); | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| readAtEndAndReturn(in); | ||
| final byte[] temp = new byte[1]; | ||
| int offset = READAHEAD; | ||
| in.seek(offset); | ||
| // expect to read a byte successfully. | ||
| temp[0] = in.readByte(); | ||
| assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1); | ||
| LOG.info("Read of byte at offset {} returned expected value", offset); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Write the standard {@link #DATASET} dataset to the given path. | ||
| * @param path path to write to. | ||
| * @throws IOException failure | ||
| */ | ||
| private void writeTestDataset(final Path path) throws IOException { | ||
| ContractTestUtils.writeDataset(getFileSystem(), path, | ||
| DATASET, DATASET_LEN, READAHEAD, true); | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document allowed or intended values of seekPolicy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they are defined in the Constants.INPUT_FADV. vars