Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void testSeekBigFile() throws Throwable {
describe("Seek round a large file and verify the bytes are what is expected");
Path testSeekFile = path("bigseekfile.txt");
byte[] block = dataset(100 * 1024, 0, 255);
createFile(getFileSystem(), testSeekFile, false, block);
createFile(getFileSystem(), testSeekFile, true, block);
instream = getFileSystem().open(testSeekFile);
assertEquals(0, instream.getPos());
//expect that seek to 0 works
Expand Down Expand Up @@ -309,7 +309,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
assumeSupportsPositionedReadable();
Path testSeekFile = path("bigseekfile.txt");
byte[] block = dataset(65536, 0, 255);
createFile(getFileSystem(), testSeekFile, false, block);
createFile(getFileSystem(), testSeekFile, true, block);
instream = getFileSystem().open(testSeekFile);
instream.seek(39999);
assertTrue(-1 != instream.read());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void seekInStream(long targetPos, long length) throws IOException {
long forwardSeekLimit = Math.min(remainingInCurrentRequest,
forwardSeekRange);
boolean skipForward = remainingInCurrentRequest > 0
&& diff <= forwardSeekLimit;
&& diff < forwardSeekLimit;
if (skipForward) {
// the forward seek range is within the limits
LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
Expand All @@ -275,6 +275,8 @@ private void seekInStream(long targetPos, long length) throws IOException {

if (pos == targetPos) {
// all is well
LOG.debug("Now at {}: bytes remaining in current request: {}",
pos, remainingInCurrentRequest());
return;
} else {
// log a warning; continue to attempt to re-open
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,306 @@

package org.apache.hadoop.fs.contract.s3a;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ATestUtils;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;

/**
* S3A contract tests covering file seek.
*/
@RunWith(Parameterized.class)
public class ITestS3AContractSeek extends AbstractContractSeekTest {

private static final Logger LOG =
LoggerFactory.getLogger(ITestS3AContractSeek.class);

protected static final int READAHEAD = 1024;

private final String seekPolicy;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document allowed or intended values of seekPolicy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are defined in the Constants.INPUT_FADV. vars


public static final int DATASET_LEN = READAHEAD * 2;

public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);

/**
* This test suite is parameterized for the different seek policies
* which S3A Supports.
* @return a list of seek policies to test.
*/
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{INPUT_FADV_RANDOM},
{INPUT_FADV_NORMAL},
{INPUT_FADV_SEQUENTIAL},
});
}

/**
* Run the test with a chosen seek policy.
* @param seekPolicy fadvise policy to use.
*/
public ITestS3AContractSeek(final String seekPolicy) {
this.seekPolicy = seekPolicy;
}

/**
* Create a configuration, possibly patching in S3Guard options.
* The FS is set to be uncached and the readahead and seek policies
* of the bucket itself are removed, so as to guarantee that the
* parameterized and test settings are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameterized and test settings are ... what?

Also nit: readahead not readhead

* @return a configuration
*/
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// patch in S3Guard options
maybeEnableS3Guard(conf);
// purge any per-bucket overrides.
try {
URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a")));
S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf,
READAHEAD_RANGE,
INPUT_FADVISE);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
// the FS is uncached, so will need clearing in test teardowns.
S3ATestUtils.disableFilesystemCaching(conf);
conf.setInt(READAHEAD_RANGE, READAHEAD);
conf.set(INPUT_FADVISE, seekPolicy);
return conf;
}

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}

@Override
public void teardown() throws Exception {
S3AFileSystem fs = getFileSystem();
if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) {
fs.close();
}
super.teardown();
}

/**
* This subclass of the {@code path(path)} operation adds the seek policy
* to the end to guarantee uniqueness across different calls of the same
* method.
*
* {@inheritDoc}
*/
@Override
protected Path path(final String filepath) throws IOException {
return super.path(filepath + "-" + seekPolicy);
}

/**
* Go to end, read then seek back to the previous position to force normal
* seek policy to switch to random IO.
* This will call readByte to trigger the second GET
* @param in input stream
* @return the byte read
* @throws IOException failure.
*/
private byte readAtEndAndReturn(final FSDataInputStream in)
throws IOException {
long pos = in.getPos();
in.seek(DATASET_LEN -1);
in.readByte();
// go back to start and force a new GET
in.seek(pos);
return in.readByte();
}

/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
*/
private void assertDatasetEquals(
final int readOffset, final String operation,
final byte[] data,
int length) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
assertEquals(operation + " with seek policy " + seekPolicy
+ "and read offset " + readOffset
+ ": data[" + i + "] != DATASET[" + o + "]",
DATASET[o], data[i]);
}
}

@Override
public S3AFileSystem getFileSystem() {
return (S3AFileSystem) super.getFileSystem();
}

@Test
public void testReadPolicyInFS() throws Throwable {
describe("Verify the read policy is being consistently set");
S3AFileSystem fs = getFileSystem();
assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy());
}

/**
* Test for HADOOP-16109: Parquet reading S3AFileSystem causes EOF.
* This sets up a read which will span the active readahead and,
* in random IO mode, a subsequent GET.
*/
@Test
public void testReadAcrossReadahead() throws Throwable {
describe("Sets up a read which will span the active readahead"
+ " and the rest of the file.");
Path path = path("testReadAcrossReadahead");
writeTestDataset(path);
FileSystem fs = getFileSystem();
// forward seek reading across readahead boundary
try (FSDataInputStream in = fs.open(path)) {
final byte[] temp = new byte[5];
in.readByte();
int offset = READAHEAD - 1;
in.readFully(offset, temp); // <-- works
assertDatasetEquals(offset, "read spanning boundary", temp, temp.length);
}
// Read exactly on the the boundary
try (FSDataInputStream in = fs.open(path)) {
final byte[] temp = new byte[5];
readAtEndAndReturn(in);
assertEquals("current position", 1, (int)(in.getPos()));
in.readFully(READAHEAD, temp);
assertDatasetEquals(READAHEAD, "read exactly on boundary",
temp, temp.length);
}
}

/**
* Read across the end of the read buffer using the readByte call,
* which will read a single byte only.
*/
@Test
public void testReadSingleByteAcrossReadahead() throws Throwable {
describe("Read over boundary using read()/readByte() calls.");
Path path = path("testReadSingleByteAcrossReadahead");
writeTestDataset(path);
FileSystem fs = getFileSystem();
try (FSDataInputStream in = fs.open(path)) {
final byte[] b0 = new byte[1];
readAtEndAndReturn(in);
in.seek(READAHEAD - 1);
b0[0] = in.readByte();
assertDatasetEquals(READAHEAD - 1, "read before end of boundary", b0,
b0.length);
b0[0] = in.readByte();
assertDatasetEquals(READAHEAD, "read at end of boundary", b0, b0.length);
b0[0] = in.readByte();
assertDatasetEquals(READAHEAD + 1, "read after end of boundary", b0,
b0.length);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is definitely one of the major cases of concern.

}

@Test
public void testSeekToReadaheadAndRead() throws Throwable {
describe("Seek to just before readahead limit and call"
+ " InputStream.read(byte[])");
Path path = path("testSeekToReadaheadAndRead");
FileSystem fs = getFileSystem();
writeTestDataset(path);
try (FSDataInputStream in = fs.open(path)) {
readAtEndAndReturn(in);
final byte[] temp = new byte[5];
int offset = READAHEAD - 1;
in.seek(offset);
// expect to read at least one byte.
int l = in.read(temp);
assertTrue("Reading in temp data", l > 0);
LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
assertDatasetEquals(offset, "read at end of boundary", temp, l);
}
}

@Test
public void testSeekToReadaheadExactlyAndRead() throws Throwable {
describe("Seek to exactly the readahead limit and call"
+ " InputStream.read(byte[])");
Path path = path("testSeekToReadaheadExactlyAndRead");
FileSystem fs = getFileSystem();
writeTestDataset(path);
try (FSDataInputStream in = fs.open(path)) {
readAtEndAndReturn(in);
final byte[] temp = new byte[5];
int offset = READAHEAD;
in.seek(offset);
// expect to read at least one byte.
int l = in.read(temp);
LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
assertTrue("Reading in temp data", l > 0);
assertDatasetEquals(offset, "read at end of boundary", temp, l);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, important test case. Could we also add a case that does:

    try (FSDataInputStream in = fs.open(path)) {
      readAtEndAndReturn(in);
      final byte[] temp = new byte[1];
      int offset = READAHEAD;
      in.seek(offset);
      // expect to read a byte successfully.
      temp[0] = in.readByte();
      assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1);
      LOG.info("Read of byte at offset {} returned expected value", offset);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well ok, though as readByte is read + an assert that the value >= 0, you don't really gain much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I guess I was thinking of readFully(), which in S3AInputStream.java can cause a new seek and thereby change the cache boundaries. You're right it's not relevant here.

}

@Test
public void testSeekToReadaheadExactlyAndReadByte() throws Throwable {
describe("Seek to exactly the readahead limit and call"
+ " readByte()");
Path path = path("testSeekToReadaheadExactlyAndReadByte");
FileSystem fs = getFileSystem();
writeTestDataset(path);
try (FSDataInputStream in = fs.open(path)) {
readAtEndAndReturn(in);
final byte[] temp = new byte[1];
int offset = READAHEAD;
in.seek(offset);
// expect to read a byte successfully.
temp[0] = in.readByte();
assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1);
LOG.info("Read of byte at offset {} returned expected value", offset);
}
}

/**
* Write the standard {@link #DATASET} dataset to the given path.
* @param path path to write to.
* @throws IOException failure
*/
private void writeTestDataset(final Path path) throws IOException {
ContractTestUtils.writeDataset(getFileSystem(), path,
DATASET, DATASET_LEN, READAHEAD, true);
}

}