Skip to content

HADOOP-17065. Add Network Counters to ABFS #2056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
/**
* Instrumentation of Abfs counters.
*/
public class AbfsInstrumentation implements AbfsCounters {
public class AbfsCountersImpl implements AbfsCounters {

/**
* Single context for all the Abfs counters to separate them from other
Expand Down Expand Up @@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters {
DIRECTORIES_DELETED,
FILES_CREATED,
FILES_DELETED,
ERROR_IGNORED
ERROR_IGNORED,
CONNECTIONS_MADE,
SEND_REQUESTS,
GET_RESPONSES,
BYTES_SENT,
BYTES_RECEIVED,
READ_THROTTLES,
WRITE_THROTTLES
};

public AbfsInstrumentation(URI uri) {
public AbfsCountersImpl(URI uri) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag(REGISTRY_ID,
"A unique identifier for the instance",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/**
* Statistic which are collected in Abfs.
* Available as metrics in {@link AbfsInstrumentation}.
* Available as metrics in {@link AbfsCountersImpl}.
*/
public enum AbfsStatistic {

Expand Down Expand Up @@ -57,7 +57,23 @@ public enum AbfsStatistic {
FILES_DELETED("files_deleted",
"Total number of files deleted from the object store."),
ERROR_IGNORED("error_ignored",
"Errors caught and ignored.");
"Errors caught and ignored."),

//Network statistics.
CONNECTIONS_MADE("connections_made",
"Total number of times a connection was made with the data store."),
SEND_REQUESTS("send_requests",
"Total number of times http requests were sent to the data store."),
GET_RESPONSES("get_responses",
"Total number of times a response was received."),
BYTES_SENT("bytes_sent",
"Total bytes uploaded."),
BYTES_RECEIVED("bytes_received",
"Total bytes received."),
READ_THROTTLES("read_throttles",
"Total number of times a read operation is throttled."),
WRITE_THROTTLES("write_throttles",
"Total number of times a write operation is throttled.");

private String statName;
private String statDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {

private boolean delegationTokenEnabled = false;
private AbfsDelegationTokenManager delegationTokenManager;
private AbfsCounters instrumentation;
private AbfsCounters abfsCounters;

@Override
public void initialize(URI uri, Configuration configuration)
Expand All @@ -109,11 +109,12 @@ public void initialize(URI uri, Configuration configuration)
LOG.debug("Initializing AzureBlobFileSystem for {}", uri);

this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
abfsCounters = new AbfsCountersImpl(uri);
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
configuration, abfsCounters);
LOG.trace("AzureBlobFileSystemStore init complete");

final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
instrumentation = new AbfsInstrumentation(uri);
this.setWorkingDirectory(this.getHomeDirectory());

if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
Expand Down Expand Up @@ -150,8 +151,8 @@ public String toString() {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
if (instrumentation != null) {
sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
if (abfsCounters != null) {
sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
"}", true));
sb.append("}");
}
Expand Down Expand Up @@ -392,7 +393,7 @@ private void statIncrement(AbfsStatistic statistic) {
* @param statistic the Statistic to be incremented.
*/
private void incrementStatistic(AbfsStatistic statistic) {
instrumentation.incrementCounter(statistic, 1);
abfsCounters.incrementCounter(statistic, 1);
}

/**
Expand Down Expand Up @@ -1241,7 +1242,7 @@ boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {

@VisibleForTesting
Map<String, Long> getInstrumentationMap() {
return instrumentation.toMap();
return abfsCounters.toMap();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
Expand Down Expand Up @@ -143,8 +144,9 @@ public class AzureBlobFileSystemStore implements Closeable {
private final IdentityTransformerInterface identityTransformer;
private final AbfsPerfTracker abfsPerfTracker;

public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
throws IOException {
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
Configuration configuration,
AbfsCounters abfsCounters) throws IOException {
this.uri = uri;
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
Expand Down Expand Up @@ -182,7 +184,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
initializeClient(uri, fileSystemName, accountName, useHttps);
initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
Expand Down Expand Up @@ -1170,7 +1172,8 @@ public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}

private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure)
private void initializeClient(URI uri, String fileSystemName,
String accountName, boolean isSecure, AbfsCounters abfsCounters)
throws IOException {
if (this.client != null) {
return;
Expand Down Expand Up @@ -1214,11 +1217,11 @@ private void initializeClient(URI uri, String fileSystemName, String accountName
if (tokenProvider != null) {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
tokenProvider, abfsPerfTracker);
tokenProvider, abfsPerfTracker, abfsCounters);
} else {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
sasTokenProvider, abfsPerfTracker);
sasTokenProvider, abfsPerfTracker, abfsCounters);
}
LOG.trace("AbfsClient init complete");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ public class AbfsClient implements Closeable {
private final AuthType authType;
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;

private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AbfsPerfTracker abfsPerfTracker) {
final AbfsPerfTracker abfsPerfTracker,
final AbfsCounters abfsCounters) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
Expand All @@ -104,23 +106,28 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden

this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsPerfTracker;
this.abfsCounters = abfsCounters;
}

public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
final AbfsPerfTracker abfsPerfTracker,
final AbfsCounters abfsCounters) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
this.tokenProvider = tokenProvider;
}

public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final SASTokenProvider sasTokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
final AbfsPerfTracker abfsPerfTracker,
final AbfsCounters abfsCounters) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
this.sasTokenProvider = sasTokenProvider;
}

Expand Down Expand Up @@ -892,4 +899,12 @@ URL getBaseUrl() {
public SASTokenProvider getSasTokenProvider() {
return this.sasTokenProvider;
}

/**
* Getter for abfsCounters from AbfsClient.
* @return AbfsCounters instance.
*/
protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,19 @@ public void addBytesTransferred(long count, boolean isFailedOperation) {

/**
* Suspends the current storage operation, as necessary, to reduce throughput.
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public void suspendIfNecessary() {
public boolean suspendIfNecessary() {
int duration = sleepDuration;
if (duration > 0) {
try {
Thread.sleep(duration);
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
return false;
}

@VisibleForTesting
Expand Down Expand Up @@ -269,4 +272,4 @@ static class AbfsOperationMetrics {
this.operationsSuccessful = new AtomicLong();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;

/**
Expand Down Expand Up @@ -103,17 +104,24 @@ static void updateMetrics(AbfsRestOperationType operationType,
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*/
static void sendingRequest(AbfsRestOperationType operationType) {
static void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters) {
if (!isAutoThrottlingEnabled) {
return;
}

switch (operationType) {
case ReadFile:
singleton.readThrottler.suspendIfNecessary();
if (singleton.readThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
}
break;
case Append:
singleton.writeThrottler.suspendIfNecessary();
if (singleton.writeThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class AbfsRestOperation {
private int retryCount = 0;

private AbfsHttpOperation result;
private AbfsCounters abfsCounters;

public AbfsHttpOperation getResult() {
return result;
Expand Down Expand Up @@ -131,6 +133,7 @@ String getSasToken() {
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();
}

/**
Expand Down Expand Up @@ -160,6 +163,7 @@ String getSasToken() {
this.buffer = buffer;
this.bufferOffset = bufferOffset;
this.bufferLength = bufferLength;
this.abfsCounters = client.getAbfsCounters();
}

/**
Expand Down Expand Up @@ -205,6 +209,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
try {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);

switch(client.getAuthType()) {
case Custom:
Expand All @@ -229,14 +234,19 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
AbfsClientThrottlingIntercept.sendingRequest(operationType);
AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);

if (hasRequestBody) {
// HttpUrlConnection requires
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
incrementCounter(AbfsStatistic.SEND_REQUESTS, 1);
incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength);
}

httpOperation.processResponse(buffer, bufferOffset, bufferLength);
incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
incrementCounter(AbfsStatistic.BYTES_RECEIVED,
httpOperation.getBytesReceived());
} catch (IOException ex) {
if (ex instanceof UnknownHostException) {
LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
Expand Down Expand Up @@ -276,4 +286,16 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS

return true;
}

/**
* Incrementing Abfs counters with a long value.
*
* @param statistic the Abfs statistic that needs to be incremented.
* @param value the value to be incremented by.
*/
private void incrementCounter(AbfsStatistic statistic, long value) {
if (abfsCounters != null) {
abfsCounters.incrementCounter(statistic, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,10 @@ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
* @param metricMap map of (String, Long) with statistics name as key and
* statistics value as map value.
*/
protected void assertAbfsStatistics(AbfsStatistic statistic,
protected long assertAbfsStatistics(AbfsStatistic statistic,
long expectedValue, Map<String, Long> metricMap) {
assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
(long) metricMap.get(statistic.getStatName()));
return expectedValue;
}
}
Loading