Skip to content

HADOOP-16080. hadoop-aws does not work with hadoop-client-api #2522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -71,8 +71,8 @@ public class CosNFileSystem extends FileSystem {
private String owner = "Unknown";
private String group = "Unknown";

private ListeningExecutorService boundedIOThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private ExecutorService boundedIOThreadPool;
private ExecutorService boundedCopyThreadPool;

public CosNFileSystem() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;

import org.apache.hadoop.classification.InterfaceAudience;

/**
Expand Down Expand Up @@ -105,8 +103,7 @@ public Thread newThread(Runnable r) {

private BlockingThreadPoolExecutorService(int permitCount,
ThreadPoolExecutor eventProcessingExecutor) {
super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
permitCount, false);
super(eventProcessingExecutor, permitCount, false);
this.eventProcessingExecutor = eventProcessingExecutor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@

package org.apache.hadoop.util;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;

import org.apache.hadoop.classification.InterfaceAudience;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,10 +48,10 @@
@SuppressWarnings("NullableProblems")
@InterfaceAudience.Private
public class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService {
ForwardingExecutorService {

private final Semaphore queueingPermits;
private final ListeningExecutorService executorDelegatee;
private final ExecutorService executorDelegatee;
private final int permitCount;

/**
Expand All @@ -62,7 +61,7 @@ public class SemaphoredDelegatingExecutor extends
* @param fair should the semaphore be "fair"
*/
public SemaphoredDelegatingExecutor(
ListeningExecutorService executorDelegatee,
ExecutorService executorDelegatee,
int permitCount,
boolean fair) {
this.permitCount = permitCount;
Expand All @@ -71,7 +70,7 @@ public SemaphoredDelegatingExecutor(
}

@Override
protected ListeningExecutorService delegate() {
protected ExecutorService delegate() {
return executorDelegatee;
}

Expand Down Expand Up @@ -102,7 +101,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
}

@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
public <T> Future<T> submit(Callable<T> task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Expand All @@ -113,7 +112,7 @@ public <T> ListenableFuture<T> submit(Callable<T> task) {
}

@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
public <T> Future<T> submit(Runnable task, T result) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Expand All @@ -124,7 +123,7 @@ public <T> ListenableFuture<T> submit(Runnable task, T result) {
}

@Override
public ListenableFuture<?> submit(Runnable task) {
public Future<?> submit(Runnable task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -423,9 +424,10 @@ private void createFileSystems(final FileSystem.Cache cache, final int count)
// only one instance can be created at a time.
URI uri = new URI("blocking://a");
ListeningExecutorService pool =
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
10, TimeUnit.SECONDS,
"creation-threads");
"creation-threads"));

// submit a set of requests to create an FS instance.
// the semaphore will block all but one, and that will block until
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem {
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private ExecutorService boundedThreadPool;
private ExecutorService boundedCopyThreadPool;

private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
Expand Down
6 changes: 6 additions & 0 deletions hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>

<!-- Ignore return value from this method call -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.impl.StoreContext"/>
<Method name="submit"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -243,7 +244,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
private ListeningExecutorService boundedThreadPool;
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
private int executorCapacity;
private long multiPartThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -207,7 +208,8 @@ public DeleteOperation(final StoreContext context,
"page size out of range: %s", pageSize);
this.pageSize = pageSize;
metadataStore = context.getMetadataStore();
executor = context.createThrottledExecutor(1);
executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor(1));
}

public long getFilesDeleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -126,7 +128,7 @@ public StoreContext(
final Configuration configuration,
final String username,
final UserGroupInformation owner,
final ListeningExecutorService executor,
final ExecutorService executor,
final int executorCapacity,
final Invoker invoker,
final S3AInstrumentation instrumentation,
Expand All @@ -143,7 +145,7 @@ public StoreContext(
this.configuration = configuration;
this.username = username;
this.owner = owner;
this.executor = executor;
this.executor = MoreExecutors.listeningDecorator(executor);
this.executorCapacity = executorCapacity;
this.invoker = invoker;
this.instrumentation = instrumentation;
Expand Down Expand Up @@ -178,7 +180,7 @@ public String getUsername() {
return username;
}

public ListeningExecutorService getExecutor() {
public ExecutorService getExecutor() {
return executor;
}

Expand Down Expand Up @@ -305,7 +307,7 @@ public void incrementGauge(Statistic statistic, long count) {
* @param capacity maximum capacity of this executor.
* @return an executor for submitting work.
*/
public ListeningExecutorService createThrottledExecutor(int capacity) {
public ExecutorService createThrottledExecutor(int capacity) {
return new SemaphoredDelegatingExecutor(executor,
capacity, true);
}
Expand All @@ -315,7 +317,7 @@ public ListeningExecutorService createThrottledExecutor(int capacity) {
* {@link #executorCapacity}.
* @return a new executor for exclusive use by the caller.
*/
public ListeningExecutorService createThrottledExecutor() {
public ExecutorService createThrottledExecutor() {
return createThrottledExecutor(executorCapacity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.hadoop.fs.s3a.impl;

import java.net.URI;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Invoker;
Expand All @@ -46,7 +45,7 @@ public class StoreContextBuilder {

private UserGroupInformation owner;

private ListeningExecutorService executor;
private ExecutorService executor;

private int executorCapacity;

Expand Down Expand Up @@ -96,7 +95,7 @@ public StoreContextBuilder setOwner(final UserGroupInformation ugi) {
}

public StoreContextBuilder setExecutor(
final ListeningExecutorService ex) {
final ExecutorService ex) {
this.executor = ex;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -452,7 +453,8 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
StoreContext context = owner.createStoreContext();
instrumentation = context.getInstrumentation().getS3GuardInstrumentation();
username = context.getUsername();
executor = context.createThrottledExecutor();
executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor());
ttlTimeProvider = Preconditions.checkNotNull(
context.getTimeProvider(),
"ttlTimeProvider must not be null");
Expand Down Expand Up @@ -507,13 +509,14 @@ public void initialize(Configuration config,
// the executor capacity for work.
int executorCapacity = intOption(conf,
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
executor = BlockingThreadPoolExecutorService.newInstance(
executorCapacity,
executorCapacity * 2,
longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0),
TimeUnit.SECONDS,
"s3a-ddb-" + tableName);
executor = MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
executorCapacity,
executorCapacity * 2,
longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0),
TimeUnit.SECONDS,
"s3a-ddb-" + tableName));
initDataAccessRetries(conf);
this.ttlTimeProvider = ttlTp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;
Expand All @@ -33,6 +32,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static void afterClass() throws Exception {
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -126,11 +127,12 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
* For submitting work.
*/
private static final ListeningExecutorService EXECUTOR =
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations");
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations"));


/**
Expand Down