Skip to content

Commit

Permalink
Merge pull request apache#1876, provide new async support for both co…
Browse files Browse the repository at this point in the history
…nsumer and provider side.
  • Loading branch information
chickenlj authored Jun 14, 2018
1 parent d2e5652 commit 0820528
Show file tree
Hide file tree
Showing 32 changed files with 533 additions and 93 deletions.
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@
<artifactId>dubbo-serialization-kryo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-bootstrap</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>hessian-lite</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {

map.remove(Constants.TRANSPORTER_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY);

map.remove(Constants.ASYNC_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.ASYNC_KEY);
}

if (localMap != null && localMap.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ public class Constants {

public static final String ASYNC_KEY = "async";

public static final String FUTURE_KEY = "async_future";

public static final String ASYNC_SUFFIX = "Async";

public static final String RETURN_KEY = "return";

public static final String TOKEN_KEY = "token";
Expand Down Expand Up @@ -323,6 +327,8 @@ public class Constants {

public static final String INTERFACE_KEY = "interface";

public static final String INTERFACES = "interfaces";

public static final String GENERIC_KEY = "generic";

public static final String FILE_KEY = "file";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.config;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* TODO This definition should better be placed in module 'dubbo-config-api', but only can be done when "rpc" dependencies are removed from "dubbo-config-api"
* If an interface is annotated with AsyncFor, it will be treated as an async counterpart for the sync one.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface AsyncFor {

/**
* The original sync-style interface
*
* @return
*/
Class<?> value();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.common.config.AsyncFor;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.utils.ConfigUtils;
import com.alibaba.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
// interface name
private String interfaceName;
private Class<?> interfaceClass;
private Class<?> asyncInterfaceClass;
// client type
private String client;
// url for peer-to-peer invocation
Expand Down Expand Up @@ -278,6 +280,7 @@ private void init() {
checkApplication();
checkStubAndMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
resolveAsyncInterface(interfaceClass, map);
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
Expand Down Expand Up @@ -435,6 +438,18 @@ private void checkDefault() {
appendProperties(consumer);
}

private void resolveAsyncInterface(Class<?> interfaceClass, Map<String, String> map) {
AsyncFor annotation = interfaceClass.getAnnotation(AsyncFor.class);
if (annotation == null) return;
Class<?> target = annotation.value();
if (!target.isAssignableFrom(interfaceClass)) return;
this.asyncInterfaceClass = interfaceClass;
this.interfaceClass = target;
setInterface(this.interfaceClass.getName());
map.put(Constants.INTERFACES, interfaceClass.getName());
}


public Class<?> getInterfaceClass() {
if (interfaceClass != null) {
return interfaceClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.dubbo.demo.consumer;

import com.alibaba.dubbo.demo.DemoService;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.Executors;

import static com.alibaba.dubbo.common.Constants.ACCEPT_FOREIGN_IP;
import static com.alibaba.dubbo.common.Constants.INTERFACES;
import static com.alibaba.dubbo.common.Constants.QOS_ENABLE;
import static com.alibaba.dubbo.common.Constants.QOS_PORT;
import static com.alibaba.dubbo.common.Constants.VALIDATION_KEY;
Expand Down Expand Up @@ -229,7 +230,8 @@ private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
.removeParameter(QOS_ENABLE)
.removeParameter(QOS_PORT)
.removeParameter(ACCEPT_FOREIGN_IP)
.removeParameter(VALIDATION_KEY);
.removeParameter(VALIDATION_KEY)
.removeParameter(INTERFACES);
return registedProviderUrl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.telnet.TelnetHandler;

import java.util.concurrent.CompletableFuture;

/**
* ExchangeHandler. (API, Prototype, ThreadSafe)
*/
Expand All @@ -33,6 +35,6 @@ public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
* @return response
* @throws RemotingException
*/
Object reply(ExchangeChannel channel, Object request) throws RemotingException;
CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.alibaba.dubbo.remoting.exchange.ExchangeHandler;
import com.alibaba.dubbo.remoting.telnet.support.TelnetHandlerAdapter;

import java.util.concurrent.CompletableFuture;

/**
* ExchangeHandlerAdapter
*/
public abstract class ExchangeHandlerAdapter extends TelnetHandlerAdapter implements ExchangeHandler {

@Override
public Object reply(ExchangeChannel channel, Object msg) throws RemotingException {
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) throws RemotingException {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.alibaba.dubbo.remoting.telnet.support.TelnetHandlerAdapter;
import com.alibaba.dubbo.remoting.transport.ChannelHandlerDispatcher;

import java.util.concurrent.CompletableFuture;

/**
* ExchangeHandlerDispatcher
*/
Expand Down Expand Up @@ -82,8 +84,8 @@ public <T> ExchangeHandlerDispatcher removeReplier(Class<T> type) {

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
return ((Replier) replierDispatcher).reply(channel, request);
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
return CompletableFuture.completedFuture(((Replier) replierDispatcher).reply(channel, request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static void removeChannelIfDisconnected(Channel ch) {

@Override
public void send(Object message) throws RemotingException {
send(message, getUrl().getParameter(Constants.SENT_KEY, false));
send(message, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

/**
* ExchangeReceiver
Expand Down Expand Up @@ -75,7 +76,7 @@ void handlerEvent(Channel channel, Request req) throws RemotingException {
}
}

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
void handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
Expand All @@ -87,20 +88,41 @@ Response handleRequest(ExchangeChannel channel, Request req) throws RemotingExce
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);

return res;
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
CompletableFuture<Object> future = handler.reply(channel, msg);
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
future.whenCompleteAsync((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(result);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
return res;
}

@Override
Expand Down Expand Up @@ -169,8 +191,7 @@ public void received(Channel channel, Object message) throws RemotingException {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* PerformanceServer
Expand Down Expand Up @@ -78,17 +79,17 @@ public String telnet(Channel channel, String message) throws RemotingException {
return "echo: " + message + "\r\ntelnet> ";
}

public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
if ("environment".equals(request)) {
return PerformanceUtils.getEnvironment();
return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment());
}
if ("scene".equals(request)) {
List<String> scene = new ArrayList<String>();
scene.add("Transporter: " + transporter);
scene.add("Service Threads: " + threads);
return scene;
return CompletableFuture.completedFuture(scene);
}
return request;
return CompletableFuture.completedFuture(request);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

//TODO response test
Expand Down Expand Up @@ -75,8 +76,8 @@ public void send(Object message) throws RemotingException {
};
ExchangeHandler exhandler = new MockedExchangeHandler() {
@Override
public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
return request;
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
return CompletableFuture.completedFuture(request);
}

@Override
Expand Down Expand Up @@ -116,7 +117,7 @@ public void send(Object message) throws RemotingException {
};
ExchangeHandler exhandler = new MockedExchangeHandler() {
@Override
public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
throw new BizException();
}
};
Expand Down Expand Up @@ -177,7 +178,7 @@ public void send(Object message) throws RemotingException {
HeaderExchangeHandler hexhandler = new HeaderExchangeHandler(new MockedExchangeHandler() {

@Override
public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
public CompletableFuture reply(ExchangeChannel channel, Object request) throws RemotingException {
Assert.fail();
throw new RemotingException(channel, "");
}
Expand All @@ -201,7 +202,7 @@ public String telnet(Channel channel, String message) throws RemotingException {
throw new UnsupportedOperationException();
}

public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
throw new UnsupportedOperationException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.junit.After;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;

public class HeartbeatHandlerTest {

private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandlerTest.class);
Expand Down Expand Up @@ -107,8 +109,8 @@ class TestHeartbeatHandler implements ExchangeHandler {
public int disconnectCount = 0;
public int connectCount = 0;

public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
return request;
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
return CompletableFuture.completedFuture(request);
}

@Override
Expand Down
Loading

0 comments on commit 0820528

Please sign in to comment.