Skip to content

HADOOP-17272. ABFS Streams to support IOStatistics API #2604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,78 @@ public final class StreamStatisticNames {
public static final String STREAM_WRITE_TOTAL_DATA
= "stream_write_total_data";

/**
* Number of bytes to upload from an OutputStream.
*/
public static final String BYTES_TO_UPLOAD
= "bytes_upload";

/**
* Number of bytes uploaded successfully to the object store.
*/
public static final String BYTES_UPLOAD_SUCCESSFUL
= "bytes_upload_successfully";

/**
* Number of bytes failed to upload to the object store.
*/
public static final String BYTES_UPLOAD_FAILED
= "bytes_upload_failed";

/**
* Total time spent on waiting for a task to complete.
*/
public static final String TIME_SPENT_ON_TASK_WAIT
= "time_spent_task_wait";

/**
* Number of task queue shrunk operations.
*/
public static final String QUEUE_SHRUNK_OPS
= "queue_shrunk_ops";

/**
* Number of times current buffer is written to the service.
*/
public static final String WRITE_CURRENT_BUFFER_OPERATIONS
= "write_current_buffer_ops";

/**
* Total time spent on completing a PUT request.
*/
public static final String TIME_SPENT_ON_PUT_REQUEST
= "time_spent_on_put_request";

/**
* Number of seeks in buffer.
*/
public static final String SEEK_IN_BUFFER
= "seek_in_buffer";

/**
* Number of bytes read from the buffer.
*/
public static final String BYTES_READ_BUFFER
= "bytes_read_buffer";

/**
* Total number of remote read operations performed.
*/
public static final String REMOTE_READ_OP
= "remote_read_op";

/**
* Total number of bytes read from readAhead.
*/
public static final String READ_AHEAD_BYTES_READ
= "read_ahead_bytes_read";

/**
* Total number of bytes read from remote operations.
*/
public static final String REMOTE_BYTES_READ
= "remote_bytes_read";

private StreamStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;

import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -48,7 +53,7 @@
* The AbfsInputStream for AbfsClient.
*/
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
StreamCapabilities, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
// Footer size is set to qualify for both ORC and parquet files
public static final int FOOTER_SIZE = 16 * ONE_KB;
Expand Down Expand Up @@ -92,6 +97,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private long bytesFromRemoteRead; // bytes read remotely; for testing

private final AbfsInputStreamContext context;
private IOStatistics ioStatistics;

public AbfsInputStream(
final AbfsClient client,
Expand Down Expand Up @@ -120,6 +126,9 @@ public AbfsInputStream(
// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
}

public String getPath() {
Expand Down Expand Up @@ -152,7 +161,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
int lastReadBytes;
int totalReadBytes = 0;
if (streamStatistics != null) {
streamStatistics.readOperationStarted(off, len);
streamStatistics.readOperationStarted();
}
incrementReadOps();
do {
Expand Down Expand Up @@ -431,7 +440,10 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
() -> client.read(path, position, b, offset, length,
tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) {
streamStatistics.remoteReadOperation();
Expand Down Expand Up @@ -694,6 +706,11 @@ public boolean shouldAlwaysReadBufferSize() {
return alwaysReadBufferSize;
}

@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}

/**
* Get the statistics of the stream.
* @return a string value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.hadoop.fs.azurebfs.services;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

/**
* Interface for statistics for the AbfsInputStream.
*/
@InterfaceStability.Unstable
public interface AbfsInputStreamStatistics {
public interface AbfsInputStreamStatistics extends IOStatisticsSource {
/**
* Seek backwards, incrementing the seek and backward seek counters.
*
Expand Down Expand Up @@ -73,11 +75,8 @@ public interface AbfsInputStreamStatistics {

/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/
void readOperationStarted(long pos, long len);
void readOperationStarted();

/**
* Records a successful remote read operation.
Expand All @@ -96,6 +95,12 @@ public interface AbfsInputStreamStatistics {
*/
void remoteBytesRead(long bytes);

/**
* Get the IOStatisticsStore instance from AbfsInputStreamStatistics.
* @return instance of IOStatisticsStore which extends IOStatistics.
*/
IOStatistics getIOStatistics();

/**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
Expand Down
Loading