Skip to content

Commit 0ca5415

Browse files
mehakmeetarjun4084346
authored andcommitted
HADOOP-17065. Add Network Counters to ABFS (#2056)
Contributed by Mehakmeet Singh. (cherry picked from commit 3472c3e)
1 parent 6fa99fa commit 0ca5415

File tree

14 files changed

+420
-28
lines changed

14 files changed

+420
-28
lines changed
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
/**
4242
* Instrumentation of Abfs counters.
4343
*/
44-
public class AbfsInstrumentation implements AbfsCounters {
44+
public class AbfsCountersImpl implements AbfsCounters {
4545

4646
/**
4747
* Single context for all the Abfs counters to separate them from other
@@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters {
7878
DIRECTORIES_DELETED,
7979
FILES_CREATED,
8080
FILES_DELETED,
81-
ERROR_IGNORED
81+
ERROR_IGNORED,
82+
CONNECTIONS_MADE,
83+
SEND_REQUESTS,
84+
GET_RESPONSES,
85+
BYTES_SENT,
86+
BYTES_RECEIVED,
87+
READ_THROTTLES,
88+
WRITE_THROTTLES
8289
};
8390

84-
public AbfsInstrumentation(URI uri) {
91+
public AbfsCountersImpl(URI uri) {
8592
UUID fileSystemInstanceId = UUID.randomUUID();
8693
registry.tag(REGISTRY_ID,
8794
"A unique identifier for the instance",

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
/**
2424
* Statistic which are collected in Abfs.
25-
* Available as metrics in {@link AbfsInstrumentation}.
25+
* Available as metrics in {@link AbfsCountersImpl}.
2626
*/
2727
public enum AbfsStatistic {
2828

@@ -57,7 +57,23 @@ public enum AbfsStatistic {
5757
FILES_DELETED("files_deleted",
5858
"Total number of files deleted from the object store."),
5959
ERROR_IGNORED("error_ignored",
60-
"Errors caught and ignored.");
60+
"Errors caught and ignored."),
61+
62+
//Network statistics.
63+
CONNECTIONS_MADE("connections_made",
64+
"Total number of times a connection was made with the data store."),
65+
SEND_REQUESTS("send_requests",
66+
"Total number of times http requests were sent to the data store."),
67+
GET_RESPONSES("get_responses",
68+
"Total number of times a response was received."),
69+
BYTES_SENT("bytes_sent",
70+
"Total bytes uploaded."),
71+
BYTES_RECEIVED("bytes_received",
72+
"Total bytes received."),
73+
READ_THROTTLES("read_throttles",
74+
"Total number of times a read operation is throttled."),
75+
WRITE_THROTTLES("write_throttles",
76+
"Total number of times a write operation is throttled.");
6177

6278
private String statName;
6379
private String statDescription;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
9797
private boolean delegationTokenEnabled = false;
9898
private AbfsDelegationTokenManager delegationTokenManager;
9999
private AbfsAuthorizer authorizer;
100-
private AbfsCounters instrumentation;
100+
private AbfsCounters abfsCounters;
101101

102102
@Override
103103
public void initialize(URI uri, Configuration configuration)
@@ -109,9 +109,10 @@ public void initialize(URI uri, Configuration configuration)
109109
LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
110110

111111
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
112-
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
112+
abfsCounters = new AbfsCountersImpl(uri);
113+
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
114+
configuration, abfsCounters);
113115
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
114-
instrumentation = new AbfsInstrumentation(uri);
115116
this.setWorkingDirectory(this.getHomeDirectory());
116117

117118
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
@@ -147,8 +148,8 @@ public String toString() {
147148
sb.append("uri=").append(uri);
148149
sb.append(", user='").append(abfsStore.getUser()).append('\'');
149150
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
150-
if (instrumentation != null) {
151-
sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
151+
if (abfsCounters != null) {
152+
sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
152153
"}", true));
153154
sb.append("}");
154155
}
@@ -395,7 +396,7 @@ private void statIncrement(AbfsStatistic statistic) {
395396
* @param statistic the Statistic to be incremented.
396397
*/
397398
private void incrementStatistic(AbfsStatistic statistic) {
398-
instrumentation.incrementCounter(statistic, 1);
399+
abfsCounters.incrementCounter(statistic, 1);
399400
}
400401

401402
/**
@@ -1256,6 +1257,6 @@ private void performAbfsAuthCheck(FsAction action, Path... paths)
12561257

12571258
@VisibleForTesting
12581259
Map<String, Long> getInstrumentationMap() {
1259-
return instrumentation.toMap();
1260+
return abfsCounters.toMap();
12601261
}
12611262
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
7474
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
7575
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
76+
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
7677
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
7778
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
7879
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
@@ -132,8 +133,9 @@ public class AzureBlobFileSystemStore {
132133
private final UserGroupInformation userGroupInformation;
133134
private final IdentityTransformer identityTransformer;
134135

135-
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
136-
throws IOException {
136+
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
137+
Configuration configuration,
138+
AbfsCounters abfsCounters) throws IOException {
137139
this.uri = uri;
138140
String[] authorityParts = authorityParts(uri);
139141
final String fileSystemName = authorityParts[0];
@@ -163,7 +165,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
163165
this.authType = abfsConfiguration.getAuthType(accountName);
164166
boolean usingOauth = (authType == AuthType.OAuth);
165167
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
166-
initializeClient(uri, fileSystemName, accountName, useHttps);
168+
initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
167169
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
168170
}
169171

@@ -927,7 +929,8 @@ public boolean isAtomicRenameKey(String key) {
927929
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
928930
}
929931

930-
private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException {
932+
private void initializeClient(URI uri, String fileSystemName,
933+
String accountName, boolean isSecure, AbfsCounters abfsCounters) throws AzureBlobFileSystemException {
931934
if (this.client != null) {
932935
return;
933936
}
@@ -960,7 +963,7 @@ private void initializeClient(URI uri, String fileSystemName, String accountName
960963

961964
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
962965
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
963-
tokenProvider);
966+
tokenProvider, abfsCounters);
964967
}
965968

966969
private String getOctalNotation(FsPermission fsPermission) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,14 @@ public class AbfsClient {
5959
private final String userAgent;
6060

6161
private final AccessTokenProvider tokenProvider;
62+
private final AbfsCounters abfsCounters;
6263

6364

6465
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
6566
final AbfsConfiguration abfsConfiguration,
6667
final ExponentialRetryPolicy exponentialRetryPolicy,
67-
final AccessTokenProvider tokenProvider) {
68+
final AccessTokenProvider tokenProvider,
69+
final AbfsCounters abfsCounters) {
6870
this.baseUrl = baseUrl;
6971
this.sharedKeyCredentials = sharedKeyCredentials;
7072
String baseUrlString = baseUrl.toString();
@@ -85,6 +87,7 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
8587

8688
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
8789
this.tokenProvider = tokenProvider;
90+
this.abfsCounters = abfsCounters;
8891
}
8992

9093
public String getFileSystem() {
@@ -608,4 +611,12 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
608611
URL getBaseUrl() {
609612
return baseUrl;
610613
}
614+
615+
/**
616+
* Getter for abfsCounters from AbfsClient.
617+
* @return AbfsCounters instance.
618+
*/
619+
protected AbfsCounters getAbfsCounters() {
620+
return abfsCounters;
621+
}
611622
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,19 @@ public void addBytesTransferred(long count, boolean isFailedOperation) {
114114

115115
/**
116116
* Suspends the current storage operation, as necessary, to reduce throughput.
117+
* @return true if Thread sleeps(Throttling occurs) else false.
117118
*/
118-
public void suspendIfNecessary() {
119+
public boolean suspendIfNecessary() {
119120
int duration = sleepDuration;
120121
if (duration > 0) {
121122
try {
122123
Thread.sleep(duration);
124+
return true;
123125
} catch (InterruptedException ie) {
124126
Thread.currentThread().interrupt();
125127
}
126128
}
129+
return false;
127130
}
128131

129132
@VisibleForTesting
@@ -269,4 +272,4 @@ static class AbfsOperationMetrics {
269272
this.operationsSuccessful = new AtomicLong();
270273
}
271274
}
272-
}
275+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26+
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
2627
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
2728

2829
/**
@@ -103,17 +104,24 @@ static void updateMetrics(AbfsRestOperationType operationType,
103104
* uses this to suspend the request, if necessary, to minimize errors and
104105
* maximize throughput.
105106
*/
106-
static void sendingRequest(AbfsRestOperationType operationType) {
107+
static void sendingRequest(AbfsRestOperationType operationType,
108+
AbfsCounters abfsCounters) {
107109
if (!isAutoThrottlingEnabled) {
108110
return;
109111
}
110112

111113
switch (operationType) {
112114
case ReadFile:
113-
singleton.readThrottler.suspendIfNecessary();
115+
if (singleton.readThrottler.suspendIfNecessary()
116+
&& abfsCounters != null) {
117+
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
118+
}
114119
break;
115120
case Append:
116-
singleton.writeThrottler.suspendIfNecessary();
121+
if (singleton.writeThrottler.suspendIfNecessary()
122+
&& abfsCounters != null) {
123+
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
124+
}
117125
break;
118126
default:
119127
break;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
3031
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
3132
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
3233
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -62,6 +63,7 @@ public class AbfsRestOperation {
6263
private int bufferLength;
6364

6465
private AbfsHttpOperation result;
66+
private AbfsCounters abfsCounters;
6567

6668
public AbfsHttpOperation getResult() {
6769
return result;
@@ -87,6 +89,7 @@ public AbfsHttpOperation getResult() {
8789
this.requestHeaders = requestHeaders;
8890
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
8991
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
92+
this.abfsCounters = client.getAbfsCounters();
9093
}
9194

9295
/**
@@ -114,6 +117,7 @@ public AbfsHttpOperation getResult() {
114117
this.buffer = buffer;
115118
this.bufferOffset = bufferOffset;
116119
this.bufferLength = bufferLength;
120+
this.abfsCounters = client.getAbfsCounters();
117121
}
118122

119123
/**
@@ -149,6 +153,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
149153
try {
150154
// initialize the HTTP request and open the connection
151155
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
156+
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
152157

153158
// sign the HTTP request
154159
if (client.getAccessToken() == null) {
@@ -161,14 +166,19 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
161166
client.getAccessToken());
162167
}
163168

164-
AbfsClientThrottlingIntercept.sendingRequest(operationType);
169+
AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
165170

166171
if (hasRequestBody) {
167172
// HttpUrlConnection requires
168173
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
174+
incrementCounter(AbfsStatistic.SEND_REQUESTS, 1);
175+
incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength);
169176
}
170177

171178
httpOperation.processResponse(buffer, bufferOffset, bufferLength);
179+
incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
180+
incrementCounter(AbfsStatistic.BYTES_RECEIVED,
181+
httpOperation.getBytesReceived());
172182
} catch (IOException ex) {
173183
if (ex instanceof UnknownHostException) {
174184
LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
@@ -208,4 +218,16 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
208218

209219
return true;
210220
}
221+
222+
/**
223+
* Incrementing Abfs counters with a long value.
224+
*
225+
* @param statistic the Abfs statistic that needs to be incremented.
226+
* @param value the value to be incremented by.
227+
*/
228+
private void incrementCounter(AbfsStatistic statistic, long value) {
229+
if (abfsCounters != null) {
230+
abfsCounters.incrementCounter(statistic, value);
231+
}
232+
}
211233
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,10 @@ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
376376
* @param metricMap map of (String, Long) with statistics name as key and
377377
* statistics value as map value.
378378
*/
379-
protected void assertAbfsStatistics(AbfsStatistic statistic,
379+
protected long assertAbfsStatistics(AbfsStatistic statistic,
380380
long expectedValue, Map<String, Long> metricMap) {
381381
assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
382382
(long) metricMap.get(statistic.getStatName()));
383+
return expectedValue;
383384
}
384385
}

0 commit comments

Comments
 (0)