Skip to content

Commit 07fdc1a

Browse files
HADOOP-18325: ABFS: Add correlated metric support for ABFS operations (#6314) (#7303)
Contributed by Anmol Asrani
1 parent 2b83e2e commit 07fdc1a

26 files changed

+2006
-50
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs;
20+
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
28+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;
29+
30+
public class AbfsBackoffMetrics {
31+
32+
private AtomicLong numberOfRequestsSucceeded;
33+
34+
private AtomicLong minBackoff;
35+
36+
private AtomicLong maxBackoff;
37+
38+
private AtomicLong totalRequests;
39+
40+
private AtomicLong totalBackoff;
41+
42+
private String retryCount;
43+
44+
private AtomicLong numberOfIOPSThrottledRequests;
45+
46+
private AtomicLong numberOfBandwidthThrottledRequests;
47+
48+
private AtomicLong numberOfOtherThrottledRequests;
49+
50+
private AtomicLong numberOfNetworkFailedRequests;
51+
52+
private AtomicLong maxRetryCount;
53+
54+
private AtomicLong totalNumberOfRequests;
55+
56+
private AtomicLong numberOfRequestsSucceededWithoutRetrying;
57+
58+
private AtomicLong numberOfRequestsFailed;
59+
60+
private final Map<String, AbfsBackoffMetrics> metricsMap
61+
= new ConcurrentHashMap<>();
62+
63+
public AbfsBackoffMetrics() {
64+
initializeMap();
65+
this.numberOfIOPSThrottledRequests = new AtomicLong();
66+
this.numberOfBandwidthThrottledRequests = new AtomicLong();
67+
this.numberOfOtherThrottledRequests = new AtomicLong();
68+
this.totalNumberOfRequests = new AtomicLong();
69+
this.maxRetryCount = new AtomicLong();
70+
this.numberOfRequestsSucceededWithoutRetrying = new AtomicLong();
71+
this.numberOfRequestsFailed = new AtomicLong();
72+
this.numberOfNetworkFailedRequests = new AtomicLong();
73+
}
74+
75+
public AbfsBackoffMetrics(String retryCount) {
76+
this.retryCount = retryCount;
77+
this.numberOfRequestsSucceeded = new AtomicLong();
78+
this.minBackoff = new AtomicLong(Long.MAX_VALUE);
79+
this.maxBackoff = new AtomicLong();
80+
this.totalRequests = new AtomicLong();
81+
this.totalBackoff = new AtomicLong();
82+
}
83+
84+
private void initializeMap() {
85+
ArrayList<String> retryCountList = new ArrayList<String>(
86+
Arrays.asList("1", "2", "3", "4", "5_15", "15_25", "25AndAbove"));
87+
for (String s : retryCountList) {
88+
metricsMap.put(s, new AbfsBackoffMetrics(s));
89+
}
90+
}
91+
92+
public long getNumberOfRequestsSucceeded() {
93+
return this.numberOfRequestsSucceeded.get();
94+
}
95+
96+
public void setNumberOfRequestsSucceeded(long numberOfRequestsSucceeded) {
97+
this.numberOfRequestsSucceeded.set(numberOfRequestsSucceeded);
98+
}
99+
100+
public void incrementNumberOfRequestsSucceeded() {
101+
this.numberOfRequestsSucceeded.getAndIncrement();
102+
}
103+
104+
public long getMinBackoff() {
105+
return this.minBackoff.get();
106+
}
107+
108+
public void setMinBackoff(long minBackoff) {
109+
this.minBackoff.set(minBackoff);
110+
}
111+
112+
public long getMaxBackoff() {
113+
return this.maxBackoff.get();
114+
}
115+
116+
public void setMaxBackoff(long maxBackoff) {
117+
this.maxBackoff.set(maxBackoff);
118+
}
119+
120+
public long getTotalRequests() {
121+
return this.totalRequests.get();
122+
}
123+
124+
public void incrementTotalRequests() {
125+
this.totalRequests.incrementAndGet();
126+
}
127+
128+
public void setTotalRequests(long totalRequests) {
129+
this.totalRequests.set(totalRequests);
130+
}
131+
132+
public long getTotalBackoff() {
133+
return this.totalBackoff.get();
134+
}
135+
136+
public void setTotalBackoff(long totalBackoff) {
137+
this.totalBackoff.set(totalBackoff);
138+
}
139+
140+
public String getRetryCount() {
141+
return this.retryCount;
142+
}
143+
144+
public long getNumberOfIOPSThrottledRequests() {
145+
return this.numberOfIOPSThrottledRequests.get();
146+
}
147+
148+
public void setNumberOfIOPSThrottledRequests(long numberOfIOPSThrottledRequests) {
149+
this.numberOfIOPSThrottledRequests.set(numberOfIOPSThrottledRequests);
150+
}
151+
152+
public void incrementNumberOfIOPSThrottledRequests() {
153+
this.numberOfIOPSThrottledRequests.getAndIncrement();
154+
}
155+
156+
public long getNumberOfBandwidthThrottledRequests() {
157+
return this.numberOfBandwidthThrottledRequests.get();
158+
}
159+
160+
public void setNumberOfBandwidthThrottledRequests(long numberOfBandwidthThrottledRequests) {
161+
this.numberOfBandwidthThrottledRequests.set(numberOfBandwidthThrottledRequests);
162+
}
163+
164+
public void incrementNumberOfBandwidthThrottledRequests() {
165+
this.numberOfBandwidthThrottledRequests.getAndIncrement();
166+
}
167+
168+
public long getNumberOfOtherThrottledRequests() {
169+
return this.numberOfOtherThrottledRequests.get();
170+
}
171+
172+
public void setNumberOfOtherThrottledRequests(long numberOfOtherThrottledRequests) {
173+
this.numberOfOtherThrottledRequests.set(numberOfOtherThrottledRequests);
174+
}
175+
176+
public void incrementNumberOfOtherThrottledRequests() {
177+
this.numberOfOtherThrottledRequests.getAndIncrement();
178+
}
179+
180+
public long getMaxRetryCount() {
181+
return this.maxRetryCount.get();
182+
}
183+
184+
public void setMaxRetryCount(long maxRetryCount) {
185+
this.maxRetryCount.set(maxRetryCount);
186+
}
187+
188+
public void incrementMaxRetryCount() {
189+
this.maxRetryCount.getAndIncrement();
190+
}
191+
192+
public long getTotalNumberOfRequests() {
193+
return this.totalNumberOfRequests.get();
194+
}
195+
196+
public void setTotalNumberOfRequests(long totalNumberOfRequests) {
197+
this.totalNumberOfRequests.set(totalNumberOfRequests);
198+
}
199+
200+
public void incrementTotalNumberOfRequests() {
201+
this.totalNumberOfRequests.getAndIncrement();
202+
}
203+
204+
public Map<String, AbfsBackoffMetrics> getMetricsMap() {
205+
return metricsMap;
206+
}
207+
208+
public long getNumberOfRequestsSucceededWithoutRetrying() {
209+
return this.numberOfRequestsSucceededWithoutRetrying.get();
210+
}
211+
212+
public void setNumberOfRequestsSucceededWithoutRetrying(long numberOfRequestsSucceededWithoutRetrying) {
213+
this.numberOfRequestsSucceededWithoutRetrying.set(numberOfRequestsSucceededWithoutRetrying);
214+
}
215+
216+
public void incrementNumberOfRequestsSucceededWithoutRetrying() {
217+
this.numberOfRequestsSucceededWithoutRetrying.getAndIncrement();
218+
}
219+
220+
public long getNumberOfRequestsFailed() {
221+
return this.numberOfRequestsFailed.get();
222+
}
223+
224+
public void setNumberOfRequestsFailed(long numberOfRequestsFailed) {
225+
this.numberOfRequestsFailed.set(numberOfRequestsFailed);
226+
}
227+
228+
public void incrementNumberOfRequestsFailed() {
229+
this.numberOfRequestsFailed.getAndIncrement();
230+
}
231+
232+
public long getNumberOfNetworkFailedRequests() {
233+
return this.numberOfNetworkFailedRequests.get();
234+
}
235+
236+
public void setNumberOfNetworkFailedRequests(long numberOfNetworkFailedRequests) {
237+
this.numberOfNetworkFailedRequests.set(numberOfNetworkFailedRequests);
238+
}
239+
240+
public void incrementNumberOfNetworkFailedRequests() {
241+
this.numberOfNetworkFailedRequests.getAndIncrement();
242+
}
243+
244+
/*
245+
Acronyms :-
246+
1.RCTSI :- Request count that succeeded in x retries
247+
2.MMA :- Min Max Average (This refers to the backoff or sleep time between 2 requests)
248+
3.s :- seconds
249+
4.BWT :- Number of Bandwidth throttled requests
250+
5.IT :- Number of IOPS throttled requests
251+
6.OT :- Number of Other throttled requests
252+
7.NFR :- Number of requests which failed due to network errors
253+
8.%RT :- Percentage of requests that are throttled
254+
9.TRNR :- Total number of requests which succeeded without retrying
255+
10.TRF :- Total number of requests which failed
256+
11.TR :- Total number of requests which were made
257+
12.MRC :- Max retry count across all requests
258+
*/
259+
@Override
260+
public String toString() {
261+
StringBuilder metricString = new StringBuilder();
262+
long totalRequestsThrottled = getNumberOfBandwidthThrottledRequests()
263+
+ getNumberOfIOPSThrottledRequests()
264+
+ getNumberOfOtherThrottledRequests();
265+
double percentageOfRequestsThrottled =
266+
((double) totalRequestsThrottled / getTotalNumberOfRequests()) * HUNDRED;
267+
for (Map.Entry<String, AbfsBackoffMetrics> entry : metricsMap.entrySet()) {
268+
metricString.append("$RCTSI$_").append(entry.getKey())
269+
.append("R_").append("=")
270+
.append(entry.getValue().getNumberOfRequestsSucceeded());
271+
long totalRequests = entry.getValue().getTotalRequests();
272+
if (totalRequests > 0) {
273+
metricString.append("$MMA$_").append(entry.getKey())
274+
.append("R_").append("=")
275+
.append(String.format("%.3f",
276+
(double) entry.getValue().getMinBackoff() / THOUSAND))
277+
.append("s")
278+
.append(String.format("%.3f",
279+
(double) entry.getValue().getMaxBackoff() / THOUSAND))
280+
.append("s")
281+
.append(String.format("%.3f",
282+
((double) entry.getValue().getTotalBackoff() / totalRequests)
283+
/ THOUSAND))
284+
.append("s");
285+
} else {
286+
metricString.append("$MMA$_").append(entry.getKey())
287+
.append("R_").append("=0s");
288+
}
289+
}
290+
metricString.append("$BWT=")
291+
.append(getNumberOfBandwidthThrottledRequests())
292+
.append("$IT=")
293+
.append(getNumberOfIOPSThrottledRequests())
294+
.append("$OT=")
295+
.append(getNumberOfOtherThrottledRequests())
296+
.append("$RT=")
297+
.append(String.format("%.3f", percentageOfRequestsThrottled))
298+
.append("$NFR=")
299+
.append(getNumberOfNetworkFailedRequests())
300+
.append("$TRNR=")
301+
.append(getNumberOfRequestsSucceededWithoutRetrying())
302+
.append("$TRF=")
303+
.append(getNumberOfRequestsFailed())
304+
.append("$TR=")
305+
.append(getTotalNumberOfRequests())
306+
.append("$MRC=")
307+
.append(getMaxRetryCount());
308+
309+
return metricString + "";
310+
}
311+
}
312+

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
2626
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
2727
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
28+
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
2829
import org.apache.hadoop.util.Preconditions;
2930

3031
import org.apache.commons.lang3.StringUtils;
@@ -302,6 +303,26 @@ public class AbfsConfiguration{
302303
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
303304
private boolean enableAutoThrottling;
304305

306+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_IDLE_TIMEOUT,
307+
DefaultValue = DEFAULT_METRIC_IDLE_TIMEOUT_MS)
308+
private int metricIdleTimeout;
309+
310+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ANALYSIS_TIMEOUT,
311+
DefaultValue = DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS)
312+
private int metricAnalysisTimeout;
313+
314+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_URI,
315+
DefaultValue = EMPTY_STRING)
316+
private String metricUri;
317+
318+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_NAME,
319+
DefaultValue = EMPTY_STRING)
320+
private String metricAccount;
321+
322+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_KEY,
323+
DefaultValue = EMPTY_STRING)
324+
private String metricAccountKey;
325+
305326
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
306327
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
307328
private int accountOperationIdleTimeout;
@@ -961,6 +982,26 @@ public boolean isAutoThrottlingEnabled() {
961982
return this.enableAutoThrottling;
962983
}
963984

985+
public int getMetricIdleTimeout() {
986+
return this.metricIdleTimeout;
987+
}
988+
989+
public int getMetricAnalysisTimeout() {
990+
return this.metricAnalysisTimeout;
991+
}
992+
993+
public String getMetricUri() {
994+
return metricUri;
995+
}
996+
997+
public String getMetricAccount() {
998+
return metricAccount;
999+
}
1000+
1001+
public String getMetricAccountKey() {
1002+
return metricAccountKey;
1003+
}
1004+
9641005
public int getAccountOperationIdleTimeout() {
9651006
return accountOperationIdleTimeout;
9661007
}
@@ -1015,6 +1056,10 @@ public TracingHeaderFormat getTracingHeaderFormat() {
10151056
return getEnum(FS_AZURE_TRACINGHEADER_FORMAT, TracingHeaderFormat.ALL_ID_FORMAT);
10161057
}
10171058

1059+
public MetricFormat getMetricFormat() {
1060+
return getEnum(FS_AZURE_METRIC_FORMAT, MetricFormat.EMPTY);
1061+
}
1062+
10181063
public AuthType getAuthType(String accountName) {
10191064
return getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
10201065
}

0 commit comments

Comments
 (0)