Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
ahc-1412 added a new close method to wait on shared netty resources
  • Loading branch information
MiErnst committed Sep 26, 2017
commit 7026fc237167e245018092cfab4c44d68f569898
8 changes: 8 additions & 0 deletions client/src/main/java/org/asynchttpclient/AsyncHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,12 @@ public interface AsyncHttpClient extends Closeable {
* @return the config associated to this client.
*/
AsyncHttpClientConfig getConfig();

/**
* Similar to calling the method {@link #close()} but additionally waits for inactivity on shared resources between
* multiple instances of netty. Calling this method instead of the method {@link #close()} might be helpful
* on application shutdown to prevent errors like a {@link ClassNotFoundException} because the class loader was
* already removed but there are still some active tasks on this shared resources which want to access these classes.
*/
void closeAndAwaitInactivity();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.Timer;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

Expand Down Expand Up @@ -102,31 +106,60 @@ private Timer newNettyTimer() {

@Override
public void close() {
closeInternal(false);
}

public void closeAndAwaitInactivity() {
closeInternal(true);
}

private void closeInternal(boolean awaitInactivity) {
if (closeTriggered.compareAndSet(false, true)) {
try {
channelManager.close();
} catch (Throwable t) {
LOGGER.warn("Unexpected error on ChannelManager close", t);
}
if (allowStopNettyTimer) {
try {
nettyTimer.stop();
} catch (Throwable t) {
LOGGER.warn("Unexpected error on HashedWheelTimer close", t);
CompletableFuture<Void> handledCloseFuture = channelManager.close().whenComplete((v, t) -> {
if(t != null) {
LOGGER.warn("Unexpected error on ChannelManager close", t);
}
if (allowStopNettyTimer) {
try {
nettyTimer.stop();
} catch (Throwable th) {
LOGGER.warn("Unexpected error on HashedWheelTimer close", th);
}
}
});

if(awaitInactivity) {
handledCloseFuture = handledCloseFuture.thenCombine(awaitInactivity(), (v1,v2) -> null) ;
}

//see https://github.com/netty/netty/issues/2084#issuecomment-44822314

try {
ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
} catch(InterruptedException t) {
// Ignore
handledCloseFuture.get(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException t) {
LOGGER.warn("Unexpected error on AsyncHttpClient close", t);
} catch (ExecutionException e) {
// already handled and could be ignored
}

closed.compareAndSet(false, true);
}
}

private CompletableFuture<Void> awaitInactivity() {
//see https://github.com/netty/netty/issues/2084#issuecomment-44822314
CompletableFuture<Void> wait1 = CompletableFuture.runAsync(() -> {
try {
GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
} catch(InterruptedException t) {
// Ignore
}});
CompletableFuture<Void> wait2 = CompletableFuture.runAsync(() -> {
try {
ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
} catch(InterruptedException t) {
// Ignore
}});
return wait1.thenCombine(wait2, (v1,v2) -> null);
}

@Override
public boolean isClosed() {
return closed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -100,15 +101,11 @@ public class ChannelManager {
private final ChannelPool channelPool;
private final ChannelGroup openChannels;

private final CountDownLatch closeLatch;

private AsyncHttpClientHandler wsHandler;

public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {

this.config = config;

closeLatch = new CountDownLatch(2);

this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory();
try {
Expand Down Expand Up @@ -304,32 +301,25 @@ public boolean removeAll(Channel connection) {
return channelPool.removeAll(connection);
}

private void doClose() {
openChannels.close().addListener(future -> closeLatch.countDown());
channelPool.destroy();

//see https://github.com/netty/netty/issues/2084#issuecomment-44822314
try {
GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
} catch(InterruptedException t) {
// Ignore
} finally {
closeLatch.countDown();
}
}

public void close() {
public CompletableFuture<Void> close() {
CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
if (allowReleaseEventLoopGroup) {
eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)//
.addListener(future -> doClose());
} else
doClose();

try {
closeLatch.await(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Ignore
closeFuture = toCompletableFuture(
eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS));
}
return closeFuture.thenCompose(v -> toCompletableFuture(openChannels.close())).whenComplete((v,t) -> channelPool.destroy());
}

private static CompletableFuture<Void> toCompletableFuture(final Future<?> future) {
final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
future.addListener(r -> {
if(r.isSuccess()) {
completableFuture.complete(null);
} else {
completableFuture.completeExceptionally(r.cause());
}
});
return completableFuture;
}

public void closeChannel(Channel channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,9 @@ public void flushChannelPoolPartitions(Predicate<Object> predicate) {
public AsyncHttpClientConfig getConfig() {
return null;
}

@Override
public void closeAndAwaitInactivity() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,8 @@ public void flushChannelPoolPartitions(Predicate<Object> predicate) {
public AsyncHttpClientConfig getConfig() {
return null;
}

@Override
public void closeAndAwaitInactivity() {
}
}