diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java index 467b10d0977..01749b98aeb 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java @@ -19,11 +19,11 @@ package org.apache.thrift.server; -import java.util.Random; -import java.util.WeakHashMap; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -41,7 +41,7 @@ * a worker pool that deals with client connections in blocking way. */ public class TThreadPoolServer extends TServer { - private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName()); + private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class); public static class Args extends AbstractServerArgs { public int minWorkerThreads = 5; @@ -49,10 +49,6 @@ public static class Args extends AbstractServerArgs { public ExecutorService executorService; public int stopTimeoutVal = 60; public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; - public int requestTimeout = 20; - public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; - public int beBackoffSlotLength = 100; - public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS; public Args(TServerTransport transport) { super(transport); @@ -78,27 +74,6 @@ public Args stopTimeoutUnit(TimeUnit tu) { return this; } - public Args requestTimeout(int n) { - requestTimeout = n; - return this; - } - - public Args requestTimeoutUnit(TimeUnit tu) { - requestTimeoutUnit = tu; - return this; - } - //Binary exponential backoff slot length - public Args beBackoffSlotLength(int n) { - beBackoffSlotLength = n; - return this; - } - - //Binary exponential backoff slot time unit - public Args beBackoffSlotLengthUnit(TimeUnit tu) { - beBackoffSlotLengthUnit = tu; - return this; - } - public Args executorService(ExecutorService executorService) { this.executorService = executorService; return this; @@ -107,41 +82,32 @@ public Args executorService(ExecutorService executorService) { // Executor service for handling client connections private ExecutorService executorService_; - private WeakHashMap activeWorkers = new WeakHashMap<>(); private final TimeUnit stopTimeoutUnit; private final long stopTimeoutVal; - private final TimeUnit requestTimeoutUnit; - - private final long requestTimeout; - - private final long beBackoffSlotInMillis; - - private Random random = new Random(System.currentTimeMillis()); - public TThreadPoolServer(Args args) { super(args); stopTimeoutUnit = args.stopTimeoutUnit; stopTimeoutVal = args.stopTimeoutVal; - requestTimeoutUnit = args.requestTimeoutUnit; - requestTimeout = args.requestTimeout; - beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength); executorService_ = args.executorService != null ? args.executorService : createDefaultExecutorService(args); } private static ExecutorService createDefaultExecutorService(Args args) { - SynchronousQueue executorQueue = - new SynchronousQueue(); - return new ThreadPoolExecutor(args.minWorkerThreads, - args.maxWorkerThreads, - args.stopTimeoutVal, - args.stopTimeoutUnit, - executorQueue); + return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("TThreadPoolServer WorkerProcess-%d"); + return thread; + } + }); } protected ExecutorService getExecutorService() { @@ -149,7 +115,7 @@ protected ExecutorService getExecutorService() { } protected boolean preServe() { - try { + try { serverTransport_.listen(); } catch (TTransportException ttx) { LOGGER.error("Error occurred during listening.", ttx); @@ -166,13 +132,16 @@ protected boolean preServe() { } public void serve() { - if (!preServe()) { - return; - } + if (!preServe()) { + return; + } + + execute(); + + executorService_.shutdownNow(); - execute(); if (!waitForShutdown()) { - LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit); + LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit); } setServing(false); @@ -182,51 +151,17 @@ protected void execute() { while (!stopped_) { try { TTransport client = serverTransport_.accept(); - WorkerProcess wp = new WorkerProcess(client); - - int retryCount = 0; - long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout); - while(true) { - try { - executorService_.execute(wp); - activeWorkers.put(wp, Boolean.TRUE); - break; - } catch(Throwable t) { - if (t instanceof RejectedExecutionException) { - retryCount++; - try { - if (remainTimeInMillis > 0) { - //do a truncated 20 binary exponential backoff sleep - long sleepTimeInMillis = ((long) (random.nextDouble() * - (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis; - sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis); - TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis); - remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis; - } else { - client.close(); - wp = null; - LOGGER.warn("Task has been rejected by ExecutorService " + retryCount - + " times till timedout, reason: " + t); - break; - } - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting to place client on executor queue."); - Thread.currentThread().interrupt(); - break; - } - } else if (t instanceof Error) { - LOGGER.error("ExecutorService threw error: " + t, t); - throw (Error)t; - } else { - //for other possible runtime errors from ExecutorService, should also not kill serve - LOGGER.warn("ExecutorService threw error: " + t, t); - break; - } + try { + executorService_.execute(new WorkerProcess(client)); + } catch (RejectedExecutionException ree) { + if (!stopped_) { + LOGGER.warn("ThreadPool is saturated with incoming requests. Closing latest connection."); } + client.close(); } } catch (TTransportException ttx) { if (!stopped_) { - LOGGER.warn("Transport error occurred during acceptance of message.", ttx); + LOGGER.warn("Transport error occurred during acceptance of message", ttx); } } } @@ -241,8 +176,7 @@ protected boolean waitForShutdown() { long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try { - executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); - return true; + return executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); } catch (InterruptedException ix) { long newnow = System.currentTimeMillis(); timeoutMS -= (newnow - now); @@ -255,10 +189,6 @@ protected boolean waitForShutdown() { public void stop() { stopped_ = true; serverTransport_.interrupt(); - executorService_.shutdown(); - for (WorkerProcess wp : activeWorkers.keySet()) { - wp.stop(); - } } private class WorkerProcess implements Runnable { @@ -287,7 +217,7 @@ public void run() { TProtocol inputProtocol = null; TProtocol outputProtocol = null; - TServerEventHandler eventHandler = null; + Optional eventHandler = Optional.empty(); ServerContext connectionContext = null; try { @@ -297,22 +227,25 @@ public void run() { inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); - eventHandler = getEventHandler(); - if (eventHandler != null) { - connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); - } - // we check stopped_ first to make sure we're not supposed to be shutting - // down. this is necessary for graceful shutdown. - while (true) { + eventHandler = Optional.ofNullable(getEventHandler()); - if (eventHandler != null) { - eventHandler.processContext(connectionContext, inputTransport, outputTransport); - } + if (eventHandler.isPresent()) { + connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol); + } - if (stopped_) { - break; - } - processor.process(inputProtocol, outputProtocol); + while (true) { + if (Thread.currentThread().isInterrupted()) { + LOGGER.debug("WorkerProcess requested to shutdown"); + break; + } + if (eventHandler.isPresent()) { + eventHandler.get().processContext(connectionContext, inputTransport, outputTransport); + } + // This process cannot be interrupted by Interrupting the Thread. This + // will return once a message has been processed or the socket timeout + // has elapsed, at which point it will return and check the interrupt + // state of the thread. + processor.process(inputProtocol, outputProtocol); } } catch (Exception x) { LOGGER.debug("Error processing request", x); @@ -322,11 +255,11 @@ public void run() { // Ignore err-logging all transport-level/type exceptions if (!isIgnorableException(x)) { // Log the exception at error level and continue - LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x); + LOGGER.error((x instanceof TException ? "Thrift " : "") + "Error occurred during processing of message.", x); } } finally { - if (eventHandler != null) { - eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); + if (eventHandler.isPresent()) { + eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol); } if (inputTransport != null) { inputTransport.close(); @@ -344,10 +277,9 @@ private boolean isIgnorableException(Exception x) { TTransportException tTransportException = null; if (x instanceof TTransportException) { - tTransportException = (TTransportException)x; - } - else if (x.getCause() instanceof TTransportException) { - tTransportException = (TTransportException)x.getCause(); + tTransportException = (TTransportException) x; + } else if (x.getCause() instanceof TTransportException) { + tTransportException = (TTransportException) x.getCause(); } if (tTransportException != null) { @@ -359,9 +291,5 @@ else if (x.getCause() instanceof TTransportException) { } return false; } - - private void stop() { - client_.close(); - } } } diff --git a/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java b/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java index e81d8012f5e..4c84dc18d5d 100644 --- a/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java +++ b/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java @@ -36,7 +36,7 @@ public class TestThreadPoolServer { */ @Test public void testStopServerWithOpenClient() throws Exception { - TServerSocket serverSocket = new TServerSocket(0); + TServerSocket serverSocket = new TServerSocket(0, 3000); TThreadPoolServer server = buildServer(serverSocket); Thread serverThread = new Thread(() -> server.serve()); serverThread.start(); @@ -45,11 +45,17 @@ public void testStopServerWithOpenClient() throws Exception { Thread.sleep(1000); // There is a thread listening to the client Assert.assertEquals(1, ((ThreadPoolExecutor) server.getExecutorService()).getActiveCount()); + + // Trigger the server to stop, but it does not wait server.stop(); - server.waitForShutdown(); + Assert.assertTrue(server.waitForShutdown()); + // After server is stopped, the executor thread pool should be shut down - Assert.assertTrue("Server thread pool should be terminated.", server.getExecutorService().isTerminated()); - Assert.assertTrue("Client is still open.", client.isOpen()); + Assert.assertTrue("Server thread pool should be terminated", server.getExecutorService().isTerminated()); + + // TODO: The socket is actually closed (timeout) but the client code + // ignores the timeout Exception and maintains the socket open state + Assert.assertTrue("Client should be closed after server shutdown", client.isOpen()); } }