Skip to content

Commit

Permalink
THRIFT-5297: Improve TThreadPoolServer Handling of Incoming Connections
Browse files Browse the repository at this point in the history
Client: Java
Patch: David Mollitor

This closes #2266
  • Loading branch information
belugabehr authored and Jens-G committed Feb 4, 2021
1 parent ebc2ab5 commit 7ae1ec3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 130 deletions.
180 changes: 54 additions & 126 deletions lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.thrift.server;

import java.util.Random;
import java.util.WeakHashMap;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -41,18 +41,14 @@
* a worker pool that deals with client connections in blocking way.
*/
public class TThreadPoolServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class);

public static class Args extends AbstractServerArgs<Args> {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public ExecutorService executorService;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
public int requestTimeout = 20;
public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
public int beBackoffSlotLength = 100;
public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;

public Args(TServerTransport transport) {
super(transport);
Expand All @@ -78,27 +74,6 @@ public Args stopTimeoutUnit(TimeUnit tu) {
return this;
}

public Args requestTimeout(int n) {
requestTimeout = n;
return this;
}

public Args requestTimeoutUnit(TimeUnit tu) {
requestTimeoutUnit = tu;
return this;
}
//Binary exponential backoff slot length
public Args beBackoffSlotLength(int n) {
beBackoffSlotLength = n;
return this;
}

//Binary exponential backoff slot time unit
public Args beBackoffSlotLengthUnit(TimeUnit tu) {
beBackoffSlotLengthUnit = tu;
return this;
}

public Args executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
Expand All @@ -107,49 +82,40 @@ public Args executorService(ExecutorService executorService) {

// Executor service for handling client connections
private ExecutorService executorService_;
private WeakHashMap<WorkerProcess, Boolean> activeWorkers = new WeakHashMap<>();

private final TimeUnit stopTimeoutUnit;

private final long stopTimeoutVal;

private final TimeUnit requestTimeoutUnit;

private final long requestTimeout;

private final long beBackoffSlotInMillis;

private Random random = new Random(System.currentTimeMillis());

public TThreadPoolServer(Args args) {
super(args);

stopTimeoutUnit = args.stopTimeoutUnit;
stopTimeoutVal = args.stopTimeoutVal;
requestTimeoutUnit = args.requestTimeoutUnit;
requestTimeout = args.requestTimeout;
beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);

executorService_ = args.executorService != null ?
args.executorService : createDefaultExecutorService(args);
}

private static ExecutorService createDefaultExecutorService(Args args) {
SynchronousQueue<Runnable> executorQueue =
new SynchronousQueue<Runnable>();
return new ThreadPoolExecutor(args.minWorkerThreads,
args.maxWorkerThreads,
args.stopTimeoutVal,
args.stopTimeoutUnit,
executorQueue);
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("TThreadPoolServer WorkerProcess-%d");
return thread;
}
});
}

protected ExecutorService getExecutorService() {
return executorService_;
}

protected boolean preServe() {
try {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
Expand All @@ -166,13 +132,16 @@ protected boolean preServe() {
}

public void serve() {
if (!preServe()) {
return;
}
if (!preServe()) {
return;
}

execute();

executorService_.shutdownNow();

execute();
if (!waitForShutdown()) {
LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
}

setServing(false);
Expand All @@ -182,51 +151,17 @@ protected void execute() {
while (!stopped_) {
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);

int retryCount = 0;
long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
while(true) {
try {
executorService_.execute(wp);
activeWorkers.put(wp, Boolean.TRUE);
break;
} catch(Throwable t) {
if (t instanceof RejectedExecutionException) {
retryCount++;
try {
if (remainTimeInMillis > 0) {
//do a truncated 20 binary exponential backoff sleep
long sleepTimeInMillis = ((long) (random.nextDouble() *
(1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
} else {
client.close();
wp = null;
LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
+ " times till timedout, reason: " + t);
break;
}
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while waiting to place client on executor queue.");
Thread.currentThread().interrupt();
break;
}
} else if (t instanceof Error) {
LOGGER.error("ExecutorService threw error: " + t, t);
throw (Error)t;
} else {
//for other possible runtime errors from ExecutorService, should also not kill serve
LOGGER.warn("ExecutorService threw error: " + t, t);
break;
}
try {
executorService_.execute(new WorkerProcess(client));
} catch (RejectedExecutionException ree) {
if (!stopped_) {
LOGGER.warn("ThreadPool is saturated with incoming requests. Closing latest connection.");
}
client.close();
}
} catch (TTransportException ttx) {
if (!stopped_) {
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
LOGGER.warn("Transport error occurred during acceptance of message", ttx);
}
}
}
Expand All @@ -241,8 +176,7 @@ protected boolean waitForShutdown() {
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
return true;
return executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
Expand All @@ -255,10 +189,6 @@ protected boolean waitForShutdown() {
public void stop() {
stopped_ = true;
serverTransport_.interrupt();
executorService_.shutdown();
for (WorkerProcess wp : activeWorkers.keySet()) {
wp.stop();
}
}

private class WorkerProcess implements Runnable {
Expand Down Expand Up @@ -287,7 +217,7 @@ public void run() {
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;

TServerEventHandler eventHandler = null;
Optional<TServerEventHandler> eventHandler = Optional.empty();
ServerContext connectionContext = null;

try {
Expand All @@ -297,22 +227,25 @@ public void run() {
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);

eventHandler = getEventHandler();
if (eventHandler != null) {
connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
}
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
while (true) {
eventHandler = Optional.ofNullable(getEventHandler());

if (eventHandler != null) {
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
}
if (eventHandler.isPresent()) {
connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
}

if (stopped_) {
break;
}
processor.process(inputProtocol, outputProtocol);
while (true) {
if (Thread.currentThread().isInterrupted()) {
LOGGER.debug("WorkerProcess requested to shutdown");
break;
}
if (eventHandler.isPresent()) {
eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
}
// This process cannot be interrupted by Interrupting the Thread. This
// will return once a message has been processed or the socket timeout
// has elapsed, at which point it will return and check the interrupt
// state of the thread.
processor.process(inputProtocol, outputProtocol);
}
} catch (Exception x) {
LOGGER.debug("Error processing request", x);
Expand All @@ -322,11 +255,11 @@ public void run() {
// Ignore err-logging all transport-level/type exceptions
if (!isIgnorableException(x)) {
// Log the exception at error level and continue
LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x);
LOGGER.error((x instanceof TException ? "Thrift " : "") + "Error occurred during processing of message.", x);
}
} finally {
if (eventHandler != null) {
eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
if (eventHandler.isPresent()) {
eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
}
if (inputTransport != null) {
inputTransport.close();
Expand All @@ -344,10 +277,9 @@ private boolean isIgnorableException(Exception x) {
TTransportException tTransportException = null;

if (x instanceof TTransportException) {
tTransportException = (TTransportException)x;
}
else if (x.getCause() instanceof TTransportException) {
tTransportException = (TTransportException)x.getCause();
tTransportException = (TTransportException) x;
} else if (x.getCause() instanceof TTransportException) {
tTransportException = (TTransportException) x.getCause();
}

if (tTransportException != null) {
Expand All @@ -359,9 +291,5 @@ else if (x.getCause() instanceof TTransportException) {
}
return false;
}

private void stop() {
client_.close();
}
}
}
14 changes: 10 additions & 4 deletions lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TestThreadPoolServer {
*/
@Test
public void testStopServerWithOpenClient() throws Exception {
TServerSocket serverSocket = new TServerSocket(0);
TServerSocket serverSocket = new TServerSocket(0, 3000);
TThreadPoolServer server = buildServer(serverSocket);
Thread serverThread = new Thread(() -> server.serve());
serverThread.start();
Expand All @@ -45,11 +45,17 @@ public void testStopServerWithOpenClient() throws Exception {
Thread.sleep(1000);
// There is a thread listening to the client
Assert.assertEquals(1, ((ThreadPoolExecutor) server.getExecutorService()).getActiveCount());

// Trigger the server to stop, but it does not wait
server.stop();
server.waitForShutdown();
Assert.assertTrue(server.waitForShutdown());

// After server is stopped, the executor thread pool should be shut down
Assert.assertTrue("Server thread pool should be terminated.", server.getExecutorService().isTerminated());
Assert.assertTrue("Client is still open.", client.isOpen());
Assert.assertTrue("Server thread pool should be terminated", server.getExecutorService().isTerminated());

// TODO: The socket is actually closed (timeout) but the client code
// ignores the timeout Exception and maintains the socket open state
Assert.assertTrue("Client should be closed after server shutdown", client.isOpen());
}
}

Expand Down

0 comments on commit 7ae1ec3

Please sign in to comment.