21
21
import java .io .IOException ;
22
22
import java .util .LinkedList ;
23
23
import java .util .List ;
24
+ import java .util .StringJoiner ;
24
25
import java .util .concurrent .BlockingQueue ;
25
26
import java .util .concurrent .Callable ;
26
27
import java .util .concurrent .ExecutorService ;
37
38
import org .apache .hadoop .fs .Path ;
38
39
import org .apache .hadoop .fs .PathFilter ;
39
40
import org .apache .hadoop .fs .RemoteIterator ;
41
+ import org .apache .hadoop .fs .statistics .IOStatistics ;
42
+ import org .apache .hadoop .fs .statistics .IOStatisticsSnapshot ;
43
+ import org .apache .hadoop .fs .statistics .IOStatisticsSource ;
40
44
import org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
41
45
42
46
import org .apache .hadoop .thirdparty .com .google .common .annotations .VisibleForTesting ;
52
56
53
57
import org .apache .hadoop .util .concurrent .HadoopExecutors ;
54
58
59
+ import static org .apache .hadoop .fs .statistics .IOStatisticsSupport .retrieveIOStatistics ;
60
+ import static org .apache .hadoop .fs .statistics .IOStatisticsSupport .snapshotIOStatistics ;
61
+
55
62
/**
56
63
* Utility class to fetch block locations for specified Input paths using a
57
64
* configured number of threads.
60
67
* configuration.
61
68
*/
62
69
@ Private
63
- public class LocatedFileStatusFetcher {
70
+ public class LocatedFileStatusFetcher implements IOStatisticsSource {
64
71
65
72
public static final Logger LOG =
66
73
LoggerFactory .getLogger (LocatedFileStatusFetcher .class .getName ());
@@ -87,6 +94,12 @@ public class LocatedFileStatusFetcher {
87
94
88
95
private volatile Throwable unknownError ;
89
96
97
+ /**
98
+ * Demand created IO Statistics: only if the filesystem
99
+ * returns statistics does this fetch collect them.
100
+ */
101
+ private IOStatisticsSnapshot iostats ;
102
+
90
103
/**
91
104
* Instantiate.
92
105
* The newApi switch is only used to configure what exception is raised
@@ -226,7 +239,46 @@ private void decrementRunningAndCheckCompletion() {
226
239
lock .unlock ();
227
240
}
228
241
}
229
-
242
+
243
+ /**
244
+ * Return any IOStatistics collected during listing.
245
+ * @return IO stats accrued.
246
+ */
247
+ @ Override
248
+ public synchronized IOStatistics getIOStatistics () {
249
+ return iostats ;
250
+ }
251
+
252
+ /**
253
+ * Add the statistics of an individual thread's scan.
254
+ * @param stats possibly null statistics.
255
+ */
256
+ private void addResultStatistics (IOStatistics stats ) {
257
+ if (stats != null ) {
258
+ // demand creation of IO statistics.
259
+ synchronized (this ) {
260
+ LOG .debug ("Adding IOStatistics: {}" , stats );
261
+ if (iostats == null ) {
262
+ // demand create the statistics
263
+ iostats = snapshotIOStatistics (stats );
264
+ } else {
265
+ iostats .aggregate (stats );
266
+ }
267
+ }
268
+ }
269
+ }
270
+
271
+ @ Override
272
+ public String toString () {
273
+ final IOStatistics ioStatistics = getIOStatistics ();
274
+ StringJoiner stringJoiner = new StringJoiner (", " ,
275
+ LocatedFileStatusFetcher .class .getSimpleName () + "[" , "]" );
276
+ if (ioStatistics != null ) {
277
+ stringJoiner .add ("IOStatistics=" + ioStatistics );
278
+ }
279
+ return stringJoiner .toString ();
280
+ }
281
+
230
282
/**
231
283
* Retrieves block locations for the given @link {@link FileStatus}, and adds
232
284
* additional paths to the process queue if required.
@@ -266,6 +318,8 @@ public Result call() throws Exception {
266
318
}
267
319
}
268
320
}
321
+ // aggregate any stats
322
+ result .stats = retrieveIOStatistics (iter );
269
323
} else {
270
324
result .locatedFileStatuses .add (fileStatus );
271
325
}
@@ -276,6 +330,7 @@ private static class Result {
276
330
private List <FileStatus > locatedFileStatuses = new LinkedList <>();
277
331
private List <FileStatus > dirsNeedingRecursiveCalls = new LinkedList <>();
278
332
private FileSystem fs ;
333
+ private IOStatistics stats ;
279
334
}
280
335
}
281
336
@@ -290,6 +345,7 @@ private class ProcessInputDirCallback implements
290
345
@ Override
291
346
public void onSuccess (ProcessInputDirCallable .Result result ) {
292
347
try {
348
+ addResultStatistics (result .stats );
293
349
if (!result .locatedFileStatuses .isEmpty ()) {
294
350
resultQueue .add (result .locatedFileStatuses );
295
351
}
0 commit comments