Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.hadoop.fs.common;
package org.apache.hadoop.fs.impl.prefetch;

import java.io.Closeable;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,232 +17,234 @@
* under the License.
*/

package org.apache.hadoop.fs.common;
package org.apache.hadoop.fs.impl.prefetch;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkPositiveInteger;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkWithinRange;

/**
* Holds information about blocks of data in a file.
*/
public class BlockData {
public final class BlockData {

// State of each block of data.
enum State {
// Data is not yet ready to be read from this block (still being prefetched).

/** Data is not yet ready to be read from this block (still being prefetched). */
NOT_READY,

// A read of this block has been enqueued in the prefetch queue.
/** A read of this block has been enqueued in the prefetch queue. */
QUEUED,

// This block is ready to be read. That is, it has been fully read.
/** A read of this block has been enqueued in the prefetch queue. */
READY,

// This block has been cached in the local disk cache.
/** This block has been cached in the local disk cache. */
CACHED
}

// State of all blocks in a file.
/**
* State of all blocks in a file.
*/
private State[] state;

// The size of a file.
/**
* The size of a file.
*/
private final long fileSize;

// The file is divided into blocks of this size.
/**
* The file is divided into blocks of this size.
*/
private final int blockSize;

// The file has these many blocks.
/**
* The file has these many blocks.
*/
private final int numBlocks;

/**
* Constructs an instance of {@link BlockData}.
*
* @param fileSize the size of a file.
* @param blockSize the file is divided into blocks of this size.
*
* @throws IllegalArgumentException if fileSize is negative.
* @throws IllegalArgumentException if blockSize is negative.
* @throws IllegalArgumentException if blockSize is zero or negative.
*/
public BlockData(long fileSize, int blockSize) {
Validate.checkNotNegative(fileSize, "fileSize");
checkNotNegative(fileSize, "fileSize");
if (fileSize == 0) {
Validate.checkNotNegative(blockSize, "blockSize");
checkNotNegative(blockSize, "blockSize");
} else {
Validate.checkPositiveInteger(blockSize, "blockSize");
checkPositiveInteger(blockSize, "blockSize");
}

this.fileSize = fileSize;
this.blockSize = blockSize;
this.numBlocks =
(fileSize == 0) ? 0 : ((int) (fileSize / blockSize)) + (fileSize % blockSize > 0 ? 1 : 0);
(fileSize == 0)
? 0
: ((int) (fileSize / blockSize)) + (fileSize % blockSize > 0
? 1
: 0);
this.state = new State[this.numBlocks];
for (int b = 0; b < this.numBlocks; b++) {
this.setState(b, State.NOT_READY);
setState(b, State.NOT_READY);
}
}

/**
* Gets the size of each block.
*
* @return the size of each block.
*/
public int getBlockSize() {
return this.blockSize;
return blockSize;
}

/**
* Gets the size of the associated file.
*
* @return the size of the associated file.
*/
public long getFileSize() {
return this.fileSize;
return fileSize;
}

/**
* Gets the number of blocks in the associated file.
*
* @return the number of blocks in the associated file.
*/
public int getNumBlocks() {
return this.numBlocks;
return numBlocks;
}

/**
* Indicates whether the given block is the last block in the associated file.
*
* @param blockNumber the id of the desired block.
* @return true if the given block is the last block in the associated file, false otherwise.
*
* @throws IllegalArgumentException if blockNumber is invalid.
*/
public boolean isLastBlock(int blockNumber) {
if (this.fileSize == 0) {
if (fileSize == 0) {
return false;
}

throwIfInvalidBlockNumber(blockNumber);

return blockNumber == (this.numBlocks - 1);
return blockNumber == (numBlocks - 1);
}

/**
* Gets the id of the block that contains the given absolute offset.
*
* @param offset the absolute offset to check.
* @return the id of the block that contains the given absolute offset.
*
* @throws IllegalArgumentException if offset is invalid.
*/
public int getBlockNumber(long offset) {
throwIfInvalidOffset(offset);

return (int) (offset / this.blockSize);
return (int) (offset / blockSize);
}

/**
* Gets the size of the given block.
*
* @param blockNumber the id of the desired block.
* @return the size of the given block.
*/
public int getSize(int blockNumber) {
if (this.fileSize == 0) {
if (fileSize == 0) {
return 0;
}

if (this.isLastBlock(blockNumber)) {
return (int) (this.fileSize - (((long) this.blockSize) * (this.numBlocks - 1)));
if (isLastBlock(blockNumber)) {
return (int) (fileSize - (((long) blockSize) * (numBlocks - 1)));
} else {
return this.blockSize;
return blockSize;
}
}

/**
* Indicates whether the given absolute offset is valid.
*
* @param offset absolute offset in the file..
* @return true if the given absolute offset is valid, false otherwise.
*/
public boolean isValidOffset(long offset) {
return (offset >= 0) && (offset < this.fileSize);
return (offset >= 0) && (offset < fileSize);
}

/**
* Gets the start offset of the given block.

* @param blockNumber the id of the given block.
* @return the start offset of the given block.
*
* @throws IllegalArgumentException if blockNumber is invalid.
*/
public long getStartOffset(int blockNumber) {
throwIfInvalidBlockNumber(blockNumber);

return blockNumber * (long) this.blockSize;
return blockNumber * (long) blockSize;
}

/**
* Gets the relative offset corresponding to the given block and the absolute offset.
*
* @param blockNumber the id of the given block.
* @param offset absolute offset in the file.
* @return the relative offset corresponding to the given block and the absolute offset.
*
* @throws IllegalArgumentException if either blockNumber or offset is invalid.
*/
public int getRelativeOffset(int blockNumber, long offset) {
throwIfInvalidOffset(offset);

return (int) (offset - this.getStartOffset(blockNumber));
return (int) (offset - getStartOffset(blockNumber));
}

/**
* Gets the state of the given block.
*
* @param blockNumber the id of the given block.
* @return the state of the given block.
*
* @throws IllegalArgumentException if blockNumber is invalid.
*/
public State getState(int blockNumber) {
throwIfInvalidBlockNumber(blockNumber);

return this.state[blockNumber];
return state[blockNumber];
}

/**
* Sets the state of the given block to the given value.
*
* @param blockNumber the id of the given block.
* @param blockState the target state.
*
* @throws IllegalArgumentException if blockNumber is invalid.
*/
public void setState(int blockNumber, State blockState) {
throwIfInvalidBlockNumber(blockNumber);

this.state[blockNumber] = blockState;
state[blockNumber] = blockState;
}

// Debug helper.
public String getStateString() {
StringBuilder sb = new StringBuilder();
int blockNumber = 0;
while (blockNumber < this.numBlocks) {
State tstate = this.getState(blockNumber);
while (blockNumber < numBlocks) {
State tstate = getState(blockNumber);
int endBlockNumber = blockNumber;
while ((endBlockNumber < this.numBlocks) && (this.getState(endBlockNumber) == tstate)) {
while ((endBlockNumber < numBlocks) && (getState(endBlockNumber)
== tstate)) {
endBlockNumber++;
}
sb.append(String.format("[%03d ~ %03d] %s%n", blockNumber, endBlockNumber - 1, tstate));
sb.append(
String.format("[%03d ~ %03d] %s%n", blockNumber, endBlockNumber - 1,
tstate));
blockNumber = endBlockNumber;
}
return sb.toString();
}

private void throwIfInvalidBlockNumber(int blockNumber) {
Validate.checkWithinRange(blockNumber, "blockNumber", 0, this.numBlocks - 1);
checkWithinRange(blockNumber, "blockNumber", 0, numBlocks - 1);
}

private void throwIfInvalidOffset(long offset) {
Validate.checkWithinRange(offset, "offset", 0, this.fileSize - 1);
checkWithinRange(offset, "offset", 0, fileSize - 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
* under the License.
*/

package org.apache.hadoop.fs.common;
package org.apache.hadoop.fs.impl.prefetch;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;

/**
* Provides read access to the underlying file one block at a time.
*
Expand All @@ -31,8 +34,10 @@
*/
public abstract class BlockManager implements Closeable {

// Information about each block of the underlying file.
private BlockData blockData;
/**
* Information about each block of the underlying file.
*/
private final BlockData blockData;

/**
* Constructs an instance of {@code BlockManager}.
Expand All @@ -42,7 +47,7 @@ public abstract class BlockManager implements Closeable {
* @throws IllegalArgumentException if blockData is null.
*/
public BlockManager(BlockData blockData) {
Validate.checkNotNull(blockData, "blockData");
checkNotNull(blockData, "blockData");

this.blockData = blockData;
}
Expand All @@ -53,7 +58,7 @@ public BlockManager(BlockData blockData) {
* @return instance of {@code BlockData}.
*/
public BlockData getBlockData() {
return this.blockData;
return blockData;
}

/**
Expand All @@ -70,12 +75,12 @@ public BlockData getBlockData() {
* @throws IllegalArgumentException if blockNumber is negative.
*/
public BufferData get(int blockNumber) throws IOException {
Validate.checkNotNegative(blockNumber, "blockNumber");
checkNotNegative(blockNumber, "blockNumber");

int size = this.blockData.getSize(blockNumber);
int size = blockData.getSize(blockNumber);
ByteBuffer buffer = ByteBuffer.allocate(size);
long startOffset = this.blockData.getStartOffset(blockNumber);
this.read(buffer, startOffset, size);
long startOffset = blockData.getStartOffset(blockNumber);
read(buffer, startOffset, size);
buffer.flip();
return new BufferData(blockNumber, buffer);
}
Expand All @@ -100,7 +105,7 @@ public BufferData get(int blockNumber) throws IOException {
* @throws IllegalArgumentException if data is null.
*/
public void release(BufferData data) {
Validate.checkNotNull(data, "data");
checkNotNull(data, "data");

// Do nothing because we allocate a new buffer each time.
}
Expand All @@ -113,7 +118,7 @@ public void release(BufferData data) {
* @throws IllegalArgumentException if blockNumber is negative.
*/
public void requestPrefetch(int blockNumber) {
Validate.checkNotNegative(blockNumber, "blockNumber");
checkNotNegative(blockNumber, "blockNumber");

// Do nothing because we do not support prefetches.
}
Expand Down
Loading