Skip to content

HADOOP-17527. ABFS: Fix boundary conditions in InputStream seek and skip #2698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public synchronized void seek(long n) throws IOException {
if (n < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (n > contentLength) {
if (n > 0 && n >= contentLength) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about when n == 0? also, isn't n>0 unnecessary as it should be implied if the if condition at line 542 is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seek(n=0) is allowed; n>0 has to be specified so as to avoid throwing exception on seek(0) in a 0-byte file

}

Expand Down Expand Up @@ -583,8 +583,8 @@ public synchronized long skip(long n) throws IOException {
newPos = 0;
n = newPos - currentPos;
}
if (newPos > contentLength) {
newPos = contentLength;
if (newPos > 0 && newPos >= contentLength) {
newPos = contentLength - 1;
n = newPos - currentPos;
}
seek(newPos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,8 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;

public class ITestAbfsInputStreamStatistics
extends AbstractAbfsIntegrationTest {
private static final int OPERATIONS = 10;
Expand Down Expand Up @@ -89,11 +92,13 @@ public void testInitValues() throws IOException {
* Test to check statistics from seek operation in AbfsInputStream.
*/
@Test
public void testSeekStatistics() throws IOException {
public void testSeekStatistics() throws Exception {
describe("Testing the values of statistics from seek operations in "
+ "AbfsInputStream");

AzureBlobFileSystem fs = getFileSystem();
Configuration config = getRawConfiguration();
config.set(AZURE_READ_BUFFER_SIZE, String.valueOf(ONE_MB - 1));
AzureBlobFileSystem fs = getFileSystem(config);
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path seekStatPath = path(getMethodName());

Expand All @@ -112,7 +117,7 @@ public void testSeekStatistics() throws IOException {
* Writing 1MB buffer to the file, this would make the fCursor(Current
* position of cursor) to the end of file.
*/
int result = in.read(defBuffer, 0, ONE_MB);
int result = in.read(defBuffer, 0, ONE_MB - 1);
LOG.info("Result of read : {}", result);

/*
Expand All @@ -121,7 +126,7 @@ public void testSeekStatistics() throws IOException {
*/
for (int i = 0; i < OPERATIONS; i++) {
in.seek(0);
in.seek(ONE_MB);
in.seek(ONE_MB - 1);
}

AbfsInputStreamStatisticsImpl stats =
Expand Down Expand Up @@ -159,7 +164,7 @@ public void testSeekStatistics() throws IOException {
assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
stats.getForwardSeekOperations());
assertEquals("Mismatch in bytesBackwardsOnSeek value",
OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
OPERATIONS * (ONE_MB - 1), stats.getBytesBackwardsOnSeek());
assertEquals("Mismatch in bytesSkippedOnSeek value",
0, stats.getBytesSkippedOnSeek());
assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
Expand Down Expand Up @@ -279,7 +284,7 @@ public void testWithNullStreamStatistics() throws IOException {
// Verifying that AbfsInputStream Operations works with null statistics.
assertNotEquals("AbfsInputStream read() with null statistics should "
+ "work", -1, in.read());
in.seek(ONE_KB);
in.seek(ONE_KB - 1);

// Verifying toString() with no StreamStatistics.
LOG.info("AbfsInputStream: {}", in.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;

Expand All @@ -33,6 +35,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Test read, write and seek.
Expand Down Expand Up @@ -70,21 +73,32 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
abfsConfiguration.setReadBufferSize(bufferSize);


final byte[] b = new byte[2 * bufferSize];
int contentLength = 2 * bufferSize;
final byte[] b = new byte[contentLength];
new Random().nextBytes(b);

try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
stream.write(b);
}

final byte[] readBuffer = new byte[2 * bufferSize];
final byte[] readBuffer = new byte[contentLength];
int result;
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
//seek to file mid and read until (excluding) the last byte
inputStream.seek(bufferSize);
result = inputStream.read(readBuffer, bufferSize, bufferSize);
result = inputStream.read(readBuffer, bufferSize, bufferSize - 1);
assertNotEquals(-1, result);
//seek to first byte and read till file mid
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
//test seek beyond EOF handling
intercept(EOFException.class, () -> inputStream.seek(contentLength));
//seek to last valid position and read
inputStream.seek(contentLength - 1);
result = inputStream.read(readBuffer, contentLength - 1, 1);
assertNotEquals("Read should succeed for last byte", -1, result);
//negative seek
intercept(IOException.class, () -> inputStream.seek(-1));
}
assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,36 @@ public void testSkipBounds() throws Exception {

assertTrue(testFileLength > 0);

skipped = inputStream.skip(testFileLength);
assertEquals(testFileLength, skipped);
//test skip to EOF with correct input skip count
assertEquals("Position should be 0", 0, inputStream.getPos());
inputStream.skip(testFileLength - 1);
assertEquals("Position should be EOF", testFileLength - 1,
inputStream.getPos());

intercept(EOFException.class,
new Callable<Long>() {
@Override
public Long call() throws Exception {
return inputStream.skip(1);
}
}
);
long elapsedTimeMs = timer.elapsedTimeMs();
assertTrue(
String.format(
"There should not be any network I/O (elapsedTimeMs=%1$d).",
elapsedTimeMs),
elapsedTimeMs < MAX_ELAPSEDTIMEMS);
String.format(
"There should not be any network I/O (elapsedTimeMs=%1$d).",
elapsedTimeMs),
elapsedTimeMs < MAX_ELAPSEDTIMEMS);

//test negative skip from last valid position
skipped = inputStream.skip(-testFileLength + 1);
assertEquals("Incorrect skip count", -testFileLength + 1, skipped);
assertEquals("Position should be 0", 0, inputStream.getPos());

//test large positive skip from position 0 beyond EOF
skipped = inputStream.skip(testFileLength);
assertEquals("Incorrect skip count", testFileLength - 1, skipped);
assertEquals("One byte should be available after skip to EOF", 1,
inputStream.available());

//test positive skip from contentlength postion (EOF + 1)
inputStream.read(); //read 1 byte from EOF
assertEquals("Position should be testFileLength", testFileLength,
inputStream.getPos());
intercept(EOFException.class, FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
() -> inputStream.skip(1));
}
}

Expand Down Expand Up @@ -251,15 +264,15 @@ public FSDataInputStream call() throws Exception {
);

assertTrue("Test file length only " + testFileLength, testFileLength > 0);
inputStream.seek(testFileLength);
assertEquals(testFileLength, inputStream.getPos());
inputStream.seek(testFileLength - 1);
assertEquals(testFileLength - 1, inputStream.getPos());

intercept(EOFException.class,
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
new Callable<FSDataInputStream>() {
@Override
public FSDataInputStream call() throws Exception {
inputStream.seek(testFileLength + 1);
inputStream.seek(testFileLength);
return inputStream;
}
}
Expand Down Expand Up @@ -405,6 +418,29 @@ public void testSkipAndAvailableAndPosition() throws Exception {
}
}

@Test
public void testZeroByteFile() throws Exception {
Path emptyFile = new Path("/emptyFile");
getFileSystem().create(emptyFile);
FSDataInputStream in = getFileSystem().open(emptyFile);
assertEquals("Initial position of inputstream in empty file is 0", 0,
in.getPos());
in.seek(0);
assertEquals("Seek to 0 should succeed", 0, in.getPos());
long skipped = in.skip(0);
assertEquals("Number of skipped bytes should be 0", 0, skipped);
assertEquals("Position should be 0 post skip 0", 0, in.getPos());
assertEquals("Available bytes in empty file is 0", 0, in.available());

intercept(EOFException.class, () -> in.seek(1));
intercept(EOFException.class, () -> in.seek(-1));
//skip(1) from position 0 does not seek(0) since pos = contentlength
intercept(EOFException.class, () -> in.skip(1));
skipped = in.skip(-1);
assertEquals("Number of skipped bytes should be 0", 0, skipped);
assertEquals("Should seek to 0", 0, in.getPos());
}

/**
* Ensures parity in the performance of sequential read after reverse seek for
* abfs of the AbfsInputStream.
Expand Down