Skip to content

Commit 3039f7f

Browse files
committed
use iterator + rejected-ex handler
1 parent 4003aff commit 3039f7f

File tree

7 files changed

+57
-39
lines changed

7 files changed

+57
-39
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,7 @@ public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {
992992

993993
public int getWriteMaxConcurrentRequestCount() {
994994
if (this.writeMaxConcurrentRequestCount < 1) {
995-
return 4 * Runtime.getRuntime().availableProcessors();
995+
return 4 * getAvailableProcessorCount();
996996
}
997997
return this.writeMaxConcurrentRequestCount;
998998
}
@@ -1013,6 +1013,10 @@ public String getClientProvidedEncryptionKey() {
10131013
return rawConfig.get(accSpecEncKey, null);
10141014
}
10151015

1016+
public static int getAvailableProcessorCount() {
1017+
return Runtime.getRuntime().availableProcessors();
1018+
}
1019+
10161020
@VisibleForTesting
10171021
void setReadBufferSize(int bufferSize) {
10181022
this.readBufferSize = bufferSize;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,9 +1169,8 @@ public RemoteIterator<FileStatus> listStatusIterator(Path path)
11691169
if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {
11701170
TracingContext tracingContext = new TracingContext(clientCorrelationId,
11711171
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener);
1172-
AbfsListStatusRemoteIterator abfsLsItr =
1173-
new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore,
1174-
tracingContext);
1172+
AbfsListStatusRemoteIterator abfsLsItr = new AbfsListStatusRemoteIterator(
1173+
path, abfsStore, tracingContext);
11751174
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
11761175
} else {
11771176
return super.listStatusIterator(path);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@
5353
import java.util.concurrent.ExecutionException;
5454
import java.util.concurrent.ExecutorService;
5555
import java.util.concurrent.Executors;
56+
import java.util.concurrent.LinkedBlockingQueue;
57+
import java.util.concurrent.RejectedExecutionHandler;
58+
import java.util.concurrent.SynchronousQueue;
59+
import java.util.concurrent.ThreadPoolExecutor;
60+
import java.util.concurrent.TimeUnit;
5661

5762
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
5863
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -168,7 +173,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
168173
private final IdentityTransformerInterface identityTransformer;
169174
private final AbfsPerfTracker abfsPerfTracker;
170175
private final AbfsCounters abfsCounters;
171-
private final ExecutorService contentSummaryExecutorService = Executors.newCachedThreadPool();
176+
private final ThreadPoolExecutor contentSummaryExecutorService;
172177

173178
/**
174179
* The set of directories where we should store files as append blobs.
@@ -239,6 +244,17 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
239244
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
240245
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
241246
}
247+
contentSummaryExecutorService = new ThreadPoolExecutor(0,
248+
4 * AbfsConfiguration.getAvailableProcessorCount(), 60,
249+
TimeUnit.SECONDS, new SynchronousQueue<>());
250+
contentSummaryExecutorService.setRejectedExecutionHandler(
251+
(runnable, threadPoolExecutor) -> {
252+
try {
253+
contentSummaryExecutorService.getQueue().put(runnable);
254+
} catch (InterruptedException e) {
255+
LOG.debug("Could not submit GetContentSummary task to thread pool");
256+
}
257+
});
242258
}
243259

244260
/**

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import org.apache.hadoop.fs.FileStatus;
36+
import org.apache.hadoop.fs.Path;
3637
import org.apache.hadoop.fs.RemoteIterator;
3738
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
3839

@@ -46,7 +47,7 @@ public class AbfsListStatusRemoteIterator
4647
private static final int MAX_QUEUE_SIZE = 10;
4748
private static final long POLL_WAIT_TIME_IN_MS = 250;
4849

49-
private final FileStatus fileStatus;
50+
private final Path path;
5051
private final ListingSupport listingSupport;
5152
private final ArrayBlockingQueue<Object> iteratorsQueue;
5253
private final TracingContext tracingContext;
@@ -56,9 +57,9 @@ public class AbfsListStatusRemoteIterator
5657
private String continuation;
5758
private Iterator<FileStatus> currIterator;
5859

59-
public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
60+
public AbfsListStatusRemoteIterator(final Path path,
6061
final ListingSupport listingSupport, TracingContext tracingContext) {
61-
this.fileStatus = fileStatus;
62+
this.path = path;
6263
this.listingSupport = listingSupport;
6364
this.tracingContext = tracingContext;
6465
iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
@@ -147,7 +148,7 @@ private void addNextBatchIteratorToQueue()
147148
throws IOException, InterruptedException {
148149
List<FileStatus> fileStatuses = new ArrayList<>();
149150
continuation = listingSupport
150-
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
151+
.listStatus(path, null, fileStatuses, FETCH_ALL_FALSE,
151152
continuation, tracingContext);
152153
if (!fileStatuses.isEmpty()) {
153154
iteratorsQueue.put(fileStatuses.iterator());

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,24 +95,25 @@ public ContentSummary getContentSummary(Path path,
9595
}
9696

9797
/**
98-
* Calls listStatus on given path and populated fileStatus queue with
98+
* Calls listStatus on given fileStatus and populated fileStatus queue with
9999
* subdirectories. Is called by new tasks to process the complete subtree
100-
* under a given path
101-
* @param path: Path to a file or directory
100+
* under a given fileStatus
101+
* @param fileStatus : Path to a file or directory
102102
* @throws IOException: listStatus error
103103
* @throws InterruptedException: error while inserting into queue
104104
*/
105-
private void processDirectoryTree(Path path, TracingContext tracingContext)
106-
throws IOException, InterruptedException {
107-
FileStatus[] fileStatuses = abfsStore.listStatus(path, tracingContext);
108-
109-
for (FileStatus fileStatus : fileStatuses) {
110-
if (fileStatus.isDirectory()) {
111-
queue.put(fileStatus);
105+
private void processDirectoryTree(Path fileStatus,
106+
TracingContext tracingContext) throws IOException, InterruptedException {
107+
AbfsListStatusRemoteIterator iterator = new AbfsListStatusRemoteIterator(
108+
fileStatus, abfsStore, tracingContext);
109+
while (iterator.hasNext()) {
110+
FileStatus status = iterator.next();
111+
if (status.isDirectory()) {
112+
queue.put(status);
112113
processDirectory();
113114
conditionalSubmitTaskToExecutor(tracingContext);
114115
} else {
115-
processFile(fileStatus);
116+
processFile(status);
116117
}
117118
}
118119
}
@@ -141,7 +142,8 @@ private synchronized void conditionalSubmitTaskToExecutor(TracingContext tracing
141142
FileStatus fileStatus1;
142143
while ((fileStatus1 = queue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS))
143144
!= null) {
144-
processDirectoryTree(fileStatus1.getPath(), new TracingContext(tracingContext));
145+
processDirectoryTree(fileStatus1.getPath(),
146+
new TracingContext(tracingContext));
145147
}
146148
return null;
147149
});

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,8 @@ public void testAbfsIteratorWithHasNext() throws Exception {
6464
testDir, "testListPath");
6565

6666
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
67-
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
68-
getFileSystem().getFileStatus(testDir), listngSupport,
69-
getTestTracingContext(getFileSystem(), true));
67+
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
68+
listngSupport, getTestTracingContext(getFileSystem(), true));
7069
Assertions.assertThat(fsItr)
7170
.describedAs("RemoteIterator should be instance of "
7271
+ "AbfsListStatusRemoteIterator by default")
@@ -101,12 +100,10 @@ public void testAbfsIteratorWithoutHasNext() throws Exception {
101100
final List<String> fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
102101
testDir, "testListPath");
103102

104-
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
105-
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
106-
getFileSystem().getFileStatus(testDir), listngSupport,
107-
getTestTracingContext(getFileSystem(), true));
108-
Assertions.assertThat(fsItr)
109-
.describedAs("RemoteIterator should be instance of "
103+
ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
104+
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
105+
listingSupport, getTestTracingContext(getFileSystem(), true));
106+
Assertions.assertThat(fsItr).describedAs("RemoteIterator should be instance of "
110107
+ "AbfsListStatusRemoteIterator by default")
111108
.isInstanceOf(AbfsListStatusRemoteIterator.class);
112109
int itrCount = 0;
@@ -130,7 +127,7 @@ public void testAbfsIteratorWithoutHasNext() throws Exception {
130127
+ "there should be no more elements in the fileNames")
131128
.isEqualTo(0);
132129
int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
133-
verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
130+
verify(listingSupport, Mockito.atLeast(minNumberOfInvokations))
134131
.listStatus(any(Path.class), nullable(String.class),
135132
anyList(), anyBoolean(),
136133
nullable(String.class),
@@ -209,7 +206,7 @@ public void testNextWhenNoMoreElementsPresent() throws Exception {
209206
Path testDir = createTestDirectory();
210207
setPageSize(10);
211208
RemoteIterator fsItr =
212-
new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
209+
new AbfsListStatusRemoteIterator(testDir,
213210
getFileSystem().getAbfsStore(),
214211
getTestTracingContext(getFileSystem(), true));
215212
fsItr = Mockito.spy(fsItr);
@@ -257,9 +254,8 @@ public void testIOException() throws Exception {
257254

258255
String exceptionMessage = "test exception";
259256
ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
260-
RemoteIterator fsItr =
261-
new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
262-
lsSupport, getTestTracingContext(getFileSystem(), true));
257+
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir, lsSupport,
258+
getTestTracingContext(getFileSystem(), true));
263259

264260
Assertions.assertThatThrownBy(() -> fsItr.next())
265261
.describedAs(
@@ -270,7 +266,7 @@ public void testIOException() throws Exception {
270266
}
271267

272268
@Test
273-
public void testNonExistingPath() throws Throwable {
269+
public void testNonExistingPath() {
274270
Path nonExistingDir = new Path("nonExistingPath");
275271
Assertions.assertThatThrownBy(
276272
() -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs(

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestGetContentSummary.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public class ITestGetContentSummary extends AbstractAbfsIntegrationTest {
5151
private static final int TEST_BUFFER_SIZE = 20;
5252
private static final int FILES_PER_DIRECTORY = 2;
5353
private static final int MAX_THREADS = 16;
54-
private static final int NUM_FILES_FOR_LIST_MAX_TEST =10;
55-
// DEFAULT_AZURE_LIST_MAX_RESULTS + 10;
56-
private static final int NUM_CONCURRENT_CALLS = 8;
54+
private static final int NUM_FILES_FOR_LIST_MAX_TEST =
55+
DEFAULT_AZURE_LIST_MAX_RESULTS + 10;
56+
private static final int NUM_CONCURRENT_CALLS = 8;
5757

5858
private final String[] directories = {"/testFolder",
5959
"/testFolderII",

0 commit comments

Comments
 (0)