Skip to content

Commit

Permalink
add service async
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenChongze authored and ChenChongze committed Mar 2, 2017
1 parent 8954860 commit eebd8cf
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public InvocationResponse getResponse(long timeoutMillis) throws InterruptedExce
sb.append("request timeout, current time:").append(System.currentTimeMillis())
.append("\r\nrequest:").append(request);
RequestTimeoutException e = InvocationUtils.newTimeoutException("invoke timeout");
invocationContext.getChannel().write(ProviderUtils.createFailResponse(request, e));
invocationContext.getChannel().write(invocationContext, ProviderUtils.createFailResponse(request, e));
logger.error(sb.toString(), e);
} else {
this.wait(timeoutMillis_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import javax.servlet.http.HttpServletResponse;

import com.dianping.pigeon.remoting.common.util.Constants;
import com.dianping.pigeon.remoting.provider.domain.ProviderContext;
import org.apache.commons.lang.SerializationException;

import com.dianping.pigeon.log.LoggerLoader;
Expand Down Expand Up @@ -48,13 +49,13 @@ public HttpChannel(HttpServletRequest request, HttpServletResponse response) {
}

@Override
public void write(InvocationResponse invocationResponse) {
public void write(ProviderContext context, InvocationResponse invocationResponse) {
response.setContentType(getContentType());
Serializer serializer = SerializerFactory.getSerializer(invocationResponse.getSerialize());
try {
serializer.serializeResponse(response.getOutputStream(), invocationResponse);
response.flushBuffer();
if (Constants.REPLY_MANUAL) {
if (Constants.REPLY_MANUAL || context.isAsync()) {
HttpCallbackFuture httpCallbackFuture = HttpServerHandler.callbacks.get(invocationResponse.getSequence());
if(httpCallbackFuture != null) {
httpCallbackFuture.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void handle(HttpServletRequest request, HttpServletResponse response) thr

invocationResponse = server.processRequest(invocationRequest, invocationContext);
if (invocationResponse != null) {
if(Constants.REPLY_MANUAL) {
if(Constants.REPLY_MANUAL || invocationContext.isAsync()) {
callbacks.get(invocationRequest.getSequence()).getResponse(invocationRequest.getTimeout());
}
invocationResponse.get();
Expand All @@ -120,7 +120,7 @@ public void handle(HttpServletRequest request, HttpServletResponse response) thr
// 心跳消息只返回正常的, 异常不返回
if (invocationRequest.getCallType() == Constants.CALLTYPE_REPLY
&& invocationRequest.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
invocationContext.getChannel().write(ProviderUtils.createFailResponse(invocationRequest, e));
invocationContext.getChannel().write(invocationContext, ProviderUtils.createFailResponse(invocationRequest, e));
logger.error(msg, e);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.net.InetSocketAddress;

import com.dianping.pigeon.remoting.provider.domain.ProviderContext;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
Expand All @@ -24,7 +25,7 @@ public NettyServerChannel(Channel channel) {
}

@Override
public void write(final InvocationResponse response) {
public void write(ProviderContext context, final InvocationResponse response) {
ChannelFuture future = this.channel.write(response);
future.addListener(new ChannelFutureListener() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Object doInitMsg(Object message, Channel channel, long receiveTime) {
@Override
public void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {
NettyServerChannel nettyChannel = new NettyServerChannel(channel);
nettyChannel.write(response);
nettyChannel.write(null, response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public Constants() {
public static final boolean LOG_PARAMETERS = ConfigManagerLoader.getConfigManager()
.getBooleanValue(KEY_LOG_PARAMETER, false);

public static boolean REPLY_MANUAL = ConfigManagerLoader.getConfigManager().getBooleanValue(Key_REPLY_MANUAL,
public volatile static boolean REPLY_MANUAL = ConfigManagerLoader.getConfigManager().getBooleanValue(Key_REPLY_MANUAL,
false);

public static final boolean MONITOR_ENABLE = ConfigManagerLoader.getConfigManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class DefaultProviderContext extends AbstractInvocationContext implements
private Thread thread;
private ServiceMethod serviceMethod;
private String methodUri;
private boolean async = false;

public DefaultProviderContext(InvocationRequest request, ProviderChannel channel) {
super(request);
Expand Down Expand Up @@ -77,6 +78,16 @@ public ServiceMethod getServiceMethod() {
return serviceMethod;
}

@Override
public boolean isAsync() {
return async;
}

@Override
public void setAsync(boolean async) {
this.async = async;
}

@Override
public String getMethodUri() {
return methodUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public interface ProviderChannel {

void write(InvocationResponse response);
void write(ProviderContext context, InvocationResponse response);

String getRemoteAddress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public interface ProviderContext<M extends ProviderMonitorData> extends Invocati

ServiceMethod getServiceMethod();

boolean isAsync();

void setAsync(boolean async);

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Future<InvocationResponse> processRequest(final InvocationRequest request
String msg = "process request failed:" + request;
if (request.getCallType() == Constants.CALLTYPE_REPLY
&& request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
providerContext.getChannel().write(ProviderUtils.createFailResponse(request, e));
providerContext.getChannel().write(providerContext, ProviderUtils.createFailResponse(request, e));
}
// logger.error(msg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderConte
if (request.getCallType() == Constants.CALLTYPE_REPLY) {
request.setCallType(Constants.CALLTYPE_MANUAL);
}
ProviderHelper.setContext(invocationContext);
}
ProviderHelper.setContext(invocationContext);
invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis()));
Object returnObj = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderConte
if (Constants.MESSAGE_TYPE_SERVICE == request.getMessageType() && enableMethodThreadsLimit) {
decrementRequest(requestMethod);
}
if (!Constants.REPLY_MANUAL) {
if (!Constants.REPLY_MANUAL || invocationContext.isAsync()) {
ProviderStatisticsHolder.flowOut(request);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderConte
InvocationResponse response = handler.handle(invocationContext);
if (request.getCallType() == Constants.CALLTYPE_REPLY) {
invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
channel.write(response);
channel.write(invocationContext, response);
invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
}
if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public final class ProviderHelper {

private static final Monitor monitor = MonitorLoader.getMonitor();

private static ThreadLocal<Boolean> isStartAsync = new ThreadLocal<Boolean>();

private static ThreadLocal<ProviderContext> tlContext = new ThreadLocal<ProviderContext>();

public static void setContext(ProviderContext context) {
Expand All @@ -43,13 +45,21 @@ public static void clearContext() {
tlContext.remove();
}

public static ProviderContext startAsync() {
ProviderContext providerContext = getContext();
if (providerContext != null) {
providerContext.setAsync(true);
}
return providerContext;
}

public static void writeSuccessResponse(ProviderContext context, Object returnObj) {
if (context == null) {
return;
}
InvocationRequest request = context.getRequest();
InvocationResponse response = null;
if (Constants.REPLY_MANUAL && request.getCallType() != Constants.CALLTYPE_NOREPLY) {
if ((Constants.REPLY_MANUAL || context.isAsync()) && request.getCallType() != Constants.CALLTYPE_NOREPLY) {
response = ProviderUtils.createSuccessResponse(request, returnObj);
context.getTimeline().add(new TimePoint(TimePhase.B, System.currentTimeMillis()));
ProviderChannel channel = context.getChannel();
Expand All @@ -74,7 +84,7 @@ public static void writeSuccessResponse(ProviderContext context, Object returnOb
monitor.logMonitorError(e);
}
try {
channel.write(response);
channel.write(context, response);
} finally {
if (Constants.MONITOR_ENABLE) {
if (response != null && response.getSize() > 0) {
Expand Down Expand Up @@ -107,11 +117,14 @@ public static void writeSuccessResponse(ProviderContext context, Object returnOb
}

public static void writeFailureResponse(ProviderContext context, Throwable exeption) {
if (Constants.REPLY_MANUAL) {
if (context == null) {
return;
}
if (Constants.REPLY_MANUAL || context.isAsync()) {
InvocationRequest request = context.getRequest();
InvocationResponse response = ProviderUtils.createServiceExceptionResponse(request, exeption);
ProviderChannel channel = context.getChannel();
channel.write(response);
channel.write(context, response);
ProviderStatisticsHolder.flowOut(request);
List<ProviderProcessInterceptor> interceptors = ProviderProcessInterceptorFactory.getInterceptors();
for (ProviderProcessInterceptor interceptor : interceptors) {
Expand Down

0 comments on commit eebd8cf

Please sign in to comment.