Skip to content

Commit

Permalink
fixes #1089, make ExecutionDispatcher meets dubbo-user-book
Browse files Browse the repository at this point in the history
  • Loading branch information
qct authored and qct committed Apr 24, 2018
1 parent 764761d commit e911a44
Showing 1 changed file with 22 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,42 +35,30 @@ public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}

@Override
public void connected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
}

@Override
public void disconnected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request &&
t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side("+url.getIp()+","+url.getPort()+") threadpool is exhausted ,detail msg:"+t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
if (message instanceof Request && !((Request) message).isHeartbeat()) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if (t instanceof RejectedExecutionException) {
Request request = (Request) message;
if (request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
handler.received(channel, message);
}
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
}

}

0 comments on commit e911a44

Please sign in to comment.