diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java index a7f333810a3e..9a24714fa7c1 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java @@ -48,8 +48,14 @@ public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) { super(parent, executor, true); } - public DefaultEventExecutor(EventExecutorGroup parent, Executor executor, int maxPendingTasks) { - super(parent, executor, true, maxPendingTasks); + public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, int maxPendingTasks, + RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, threadFactory, true, maxPendingTasks, rejectedExecutionHandler); + } + + public DefaultEventExecutor(EventExecutorGroup parent, Executor executor, int maxPendingTasks, + RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, executor, true, maxPendingTasks, rejectedExecutionHandler); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java index a4aa0bacfc79..02b23dfb136f 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java @@ -37,7 +37,8 @@ public DefaultEventExecutorGroup(int nThreads) { * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. */ public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) { - this(nThreads, threadFactory, DefaultEventExecutor.DEFAULT_MAX_PENDING_TASKS); + this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS, + RejectedExecutionHandlers.reject()); } /** @@ -46,13 +47,15 @@ public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) { * @param nThreads the number of threads that will be used by this instance. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. + * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ - public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks) { - super(nThreads, threadFactory, maxPendingTasks); + public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks, + RejectedExecutionHandler rejectedHandler) { + super(nThreads, threadFactory, maxPendingTasks, rejectedHandler); } @Override protected EventExecutor newChild(Executor executor, Object... args) throws Exception { - return new DefaultEventExecutor(this, executor, (Integer) args[0]); + return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]); } } diff --git a/common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandler.java b/common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandler.java new file mode 100644 index 000000000000..ebb3c194cef6 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandler.java @@ -0,0 +1,28 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +/** + * Similar to {@link java.util.concurrent.RejectedExecutionHandler} but specific to {@link SingleThreadEventExecutor}. + */ +public interface RejectedExecutionHandler { + + /** + * Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity + * restrictions. + */ + void rejected(Runnable task, SingleThreadEventExecutor executor); +} diff --git a/common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandlers.java b/common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandlers.java new file mode 100644 index 000000000000..b34f7c1e1c66 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandlers.java @@ -0,0 +1,72 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.util.internal.ObjectUtil; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * Expose helper methods which create different {@link RejectedExecutionHandler}s. + */ +public final class RejectedExecutionHandlers { + private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() { + @Override + public void rejected(Runnable task, SingleThreadEventExecutor executor) { + throw new RejectedExecutionException(); + } + }; + + private RejectedExecutionHandlers() { } + + /** + * Returns a {@link RejectedExecutionHandler} that will always just throw a {@link RejectedExecutionException}. + */ + public static RejectedExecutionHandler reject() { + return REJECT; + } + + /** + * Tries to backoff when the task can not be added due restrictions for an configured amount of time. This + * is only done if the task was added from outside of the event loop which means + * {@link EventExecutor#inEventLoop()} returns {@code false}. + */ + public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) { + ObjectUtil.checkPositive(retries, "retries"); + final long backOffNanos = unit.toNanos(backoffAmount); + return new RejectedExecutionHandler() { + @Override + public void rejected(Runnable task, SingleThreadEventExecutor executor) { + if (!executor.inEventLoop()) { + for (int i = 0; i < retries; i++) { + // Try to wakup the executor so it will empty its task queue. + executor.wakeup(false); + + LockSupport.parkNanos(backOffNanos); + if (executor.offerTask(task)) { + return; + } + } + } + // Either we tried to add the task from within the EventLoop or we was not able to add it even with + // backoff. + throw new RejectedExecutionException(); + } + }; + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 930493cc96bf..0423c31bc511 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -42,7 +43,7 @@ */ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor { - static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, + static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE)); private static final InternalLogger logger = @@ -99,6 +100,7 @@ public void run() { private final Set shutdownHooks = new LinkedHashSet(); private final boolean addTaskWakesUp; private final int maxPendingTasks; + private final RejectedExecutionHandler rejectedExecutionHandler; private long lastExecutionTime; @@ -132,10 +134,12 @@ protected SingleThreadEventExecutor( * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. + * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ protected SingleThreadEventExecutor( - EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks) { - this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks); + EventExecutorGroup parent, ThreadFactory threadFactory, + boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { + this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); } /** @@ -147,7 +151,7 @@ protected SingleThreadEventExecutor( * executor thread */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { - this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS); + this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject()); } /** @@ -158,20 +162,17 @@ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. + * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ - @SuppressWarnings("deprecation") protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, - boolean addTaskWakesUp, int maxPendingTasks) { + boolean addTaskWakesUp, int maxPendingTasks, + RejectedExecutionHandler rejectedHandler) { super(parent); - - if (executor == null) { - throw new NullPointerException("executor"); - } - this.addTaskWakesUp = addTaskWakesUp; - this.executor = executor; this.maxPendingTasks = Math.max(16, maxPendingTasks); + this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(); + rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } /** @@ -326,15 +327,16 @@ protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } + if (!offerTask(task)) { + rejectedExecutionHandler.rejected(task, this); + } + } + + final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } - try { - taskQueue.add(task); - } catch (IllegalStateException e) { - // Just use add and catch the exception as this should happen only very rarely. - throw new RejectedExecutionException("Internal task queue is full", e); - } + return taskQueue.offer(task); } /** @@ -459,7 +461,9 @@ protected void cleanup() { protected void wakeup(boolean inEventLoop) { if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) { - taskQueue.add(WAKEUP_TASK); + // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there + // is already something in the queue. + taskQueue.offer(WAKEUP_TASK); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index cb42c3d8cef4..7d6aa348bb6a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -24,6 +24,7 @@ import io.netty.util.IntSupplier; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; +import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; @@ -75,8 +76,9 @@ public Integer call() throws Exception { private volatile int wakenUp; private volatile int ioRatio = 50; - EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy) { - super(parent, executor, false); + EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, + SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); if (maxEvents == 0) { allowGrowing = true; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index 55e96869687e..b915abe9fd52 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -22,6 +22,8 @@ import io.netty.channel.SelectStrategyFactory; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorChooserFactory; +import io.netty.util.concurrent.RejectedExecutionHandler; +import io.netty.util.concurrent.RejectedExecutionHandlers; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; @@ -95,16 +97,22 @@ public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEve @Deprecated public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce, SelectStrategyFactory selectStrategyFactory) { - super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory); + super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public EpollEventLoopGroup(int nThreads, Executor executor, SelectStrategyFactory selectStrategyFactory) { - super(nThreads, executor, 0, selectStrategyFactory); + super(nThreads, executor, 0, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public EpollEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, SelectStrategyFactory selectStrategyFactory) { - super(nThreads, executor, chooserFactory, 0, selectStrategyFactory); + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, RejectedExecutionHandlers.reject()); + } + + public EpollEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler); } /** @@ -120,6 +128,6 @@ public void setIoRatio(int ioRatio) { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new EpollEventLoop(this, executor, (Integer) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy()); + ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index a8abf682e8c1..a24ea43f475d 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -15,6 +15,8 @@ */ package io.netty.channel; +import io.netty.util.concurrent.RejectedExecutionHandler; +import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.SystemPropertyUtil; @@ -28,25 +30,27 @@ */ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - protected static final int MAX_PENDING_TASKS = Math.max(16, + protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { - this(parent, threadFactory, addTaskWakesUp, MAX_PENDING_TASKS); + this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { - this(parent, executor, addTaskWakesUp, MAX_PENDING_TASKS); + this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, - boolean addTaskWakesUp, int maxPendingTasks) { - super(parent, threadFactory, addTaskWakesUp, maxPendingTasks); + boolean addTaskWakesUp, int maxPendingTasks, + RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, - boolean addTaskWakesUp, int maxPendingTasks) { - super(parent, executor, addTaskWakesUp, maxPendingTasks); + boolean addTaskWakesUp, int maxPendingTasks, + RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); } @Override diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index d0d10ee44bb1..28a6a9460141 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -22,6 +22,7 @@ import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.IntSupplier; +import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -129,8 +130,8 @@ public Integer call() throws Exception { private boolean needsToSelectAgain; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, - SelectStrategy strategy) { - super(parent, executor, false); + SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index b738336debf5..833b75497853 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -22,6 +22,8 @@ import io.netty.channel.SelectStrategyFactory; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorChooserFactory; +import io.netty.util.concurrent.RejectedExecutionHandler; +import io.netty.util.concurrent.RejectedExecutionHandlers; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; @@ -72,7 +74,7 @@ public NioEventLoopGroup( public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { - super(nThreads, threadFactory, selectorProvider, selectStrategyFactory); + super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public NioEventLoopGroup( @@ -82,13 +84,21 @@ public NioEventLoopGroup( public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { - super(nThreads, executor, selectorProvider, selectStrategyFactory); + super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { - super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory); + super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, + RejectedExecutionHandlers.reject()); + } + + public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + final SelectorProvider selectorProvider, + final SelectStrategyFactory selectStrategyFactory, + final RejectedExecutionHandler rejectedExecutionHandler) { + super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); } /** @@ -114,6 +124,6 @@ public void rebuildSelectors() { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy()); + ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } }