Skip to content

Commit

Permalink
HADOOP-17862. ABFS: Fix unchecked cast compiler warning for AbfsListS…
Browse files Browse the repository at this point in the history
…tatusRemoteIterator (apache#3331)


closes apache#3331 

Contributed by Sumangala Patki
  • Loading branch information
sumangala-patki authored and HarshitGupta11 committed Nov 28, 2022
1 parent 1fee06f commit db95cd4
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

import org.apache.hadoop.fs.FileStatus;

/**
* Class to store listStatus results for AbfsListStatusRemoteIterator. The
* results can either be of type Iterator or an exception thrown during the
* operation
*/
public class AbfsListResult {
private IOException listException = null;

private Iterator<FileStatus> fileStatusIterator
= Collections.emptyIterator();

AbfsListResult(IOException ex) {
this.listException = ex;
}

AbfsListResult(Iterator<FileStatus> fileStatusIterator) {
this.fileStatusIterator = fileStatusIterator;
}

IOException getListingException() {
return listException;
}

Iterator<FileStatus> getFileStatusIterator() {
return fileStatusIterator;
}

boolean isFailedListing() {
return (listException != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.activation.UnsupportedDataTypeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,7 +47,7 @@ public class AbfsListStatusRemoteIterator

private final FileStatus fileStatus;
private final ListingSupport listingSupport;
private final ArrayBlockingQueue<Object> iteratorsQueue;
private final ArrayBlockingQueue<AbfsListResult> listResultQueue;
private final TracingContext tracingContext;

private volatile boolean isAsyncInProgress = false;
Expand All @@ -61,7 +60,7 @@ public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
this.fileStatus = fileStatus;
this.listingSupport = listingSupport;
this.tracingContext = tracingContext;
iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
currIterator = Collections.emptyIterator();
fetchBatchesAsync();
}
Expand All @@ -86,19 +85,17 @@ public FileStatus next() throws IOException {
private Iterator<FileStatus> getNextIterator() throws IOException {
fetchBatchesAsync();
try {
Object obj = null;
while (obj == null
&& (!isIterationComplete || !iteratorsQueue.isEmpty())) {
obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
AbfsListResult listResult = null;
while (listResult == null
&& (!isIterationComplete || !listResultQueue.isEmpty())) {
listResult = listResultQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
}
if (obj == null) {
if (listResult == null) {
return Collections.emptyIterator();
} else if (obj instanceof Iterator) {
return (Iterator<FileStatus>) obj;
} else if (obj instanceof IOException) {
throw (IOException) obj;
} else if (listResult.isFailedListing()) {
throw listResult.getListingException();
} else {
throw new UnsupportedDataTypeException();
return listResult.getFileStatusIterator();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -122,13 +119,13 @@ private void fetchBatchesAsync() {

private void asyncOp() {
try {
while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
while (!isIterationComplete && listResultQueue.size() <= MAX_QUEUE_SIZE) {
addNextBatchIteratorToQueue();
}
} catch (IOException ioe) {
LOG.error("Fetching filestatuses failed", ioe);
try {
iteratorsQueue.put(ioe);
listResultQueue.put(new AbfsListResult(ioe));
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", interruptedException);
Expand All @@ -143,19 +140,17 @@ private void asyncOp() {
}
}

private void addNextBatchIteratorToQueue()
private synchronized void addNextBatchIteratorToQueue()
throws IOException, InterruptedException {
List<FileStatus> fileStatuses = new ArrayList<>();
continuation = listingSupport
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
continuation, tracingContext);
if (!fileStatuses.isEmpty()) {
iteratorsQueue.put(fileStatuses.iterator());
listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
}
synchronized (this) {
if (continuation == null || continuation.isEmpty()) {
isIterationComplete = true;
}
if (continuation == null || continuation.isEmpty()) {
isIterationComplete = true;
}
}

Expand Down
Loading

0 comments on commit db95cd4

Please sign in to comment.