Skip to content

Commit

Permalink
(fix) replace SyncInvoker with AutoInvoker #89 #82 (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Apr 15, 2019
1 parent aa6b890 commit ab5a6fe
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 49 deletions.
16 changes: 8 additions & 8 deletions jupiter-example/src/main/resources/spring-consumer.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@
<!-- 以下都选项可不填 -->
<!-- 服务版本号, 通常在接口不兼容时版本号才需要升级 -->
<jupiter:property version="1.0.0.daily" />
<!-- 序列化/反序列化类型: (proto_stuff, hessian, kryo, java)可选, 默认proto_stuff -->
<!-- 序列化/反序列化类型: (proto_stuff, hessian, kryo, java)可选, 默认 proto_stuff -->
<jupiter:property serializerType="proto_stuff" />
<!-- 软负载均衡类型[random, round_robin, ext_spi] -->
<jupiter:property loadBalancerType="round_robin" />
<!-- 派发方式: (round, broadcast)可选, 默认round(单播) -->
<!-- 派发方式: (round, broadcast)可选, 默认 round (单播) -->
<jupiter:property dispatchType="round" />
<!-- 调用方式: (sync, async)可选, 默认sync(同步调用) -->
<!-- 调用方式: (auto, async)可选, 默认 auto -->
<jupiter:property invokeType="sync" />
<!-- 集群容错策略: (fail_fast, fail_over, fail_safe)可选, 默认fail_fast(快速失败) -->
<!-- 集群容错策略: (fail_fast, fail_over, fail_safe)可选, 默认 fail_fast (快速失败) -->
<jupiter:property clusterStrategy="fail_over" />
<!-- 在fail_over策略下的失败重试次数 -->
<!-- 在 fail_over 策略下的失败重试次数 -->
<jupiter:property failoverRetries="2" />
<!-- 超时时间设置 -->
<jupiter:property timeoutMillis="3000" />
Expand All @@ -76,15 +76,15 @@
SerializerType serializerType // 序列化/反序列化方式
LoadBalancerType loadBalancerType // 软负载均衡类型[random, round_robin, ext_spi]
String extLoadBalancerName; // 扩展软负载均衡唯一标识
long waitForAvailableTimeoutMillis = -1 // 如果大于0, 表示阻塞等待直到连接可用并且该值为等待时间
long waitForAvailableTimeoutMillis = -1 // 如果大于 0, 表示阻塞等待直到连接可用并且该值为等待时间
InvokeType invokeType // 调用方式 [同步, 异步]
DispatchType dispatchType // 派发方式 [单播, 广播]
long timeoutMillis // 调用超时时间设置
List<MethodSpecialConfig> methodSpecialConfigs; // 指定方法的单独配置, 方法参数类型不做区别对待
ConsumerInterceptor[] consumerInterceptors // 消费者端拦截器
String providerAddresses // provider地址列表, 逗号分隔(IP直连)
String providerAddresses // provider 地址列表, 逗号分隔(IP直连)
ClusterInvoker.Strategy clusterStrategy; // 集群容错策略
int failoverRetries // fail_over的重试次数
int failoverRetries // fail_over 的重试次数
-->
</jupiter:consumer>
</beans>
4 changes: 2 additions & 2 deletions jupiter-rpc/src/main/java/org/jupiter/rpc/InvokeType.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public enum InvokeType {
SYNC, // 同步调用
ASYNC, // 异步调用
AUTO; // 当你的接口返回值是一个 CompletableFuture 或者它的子类将自动适配为异步调用, 否则为同步调用
AUTO; // 当接口返回值是一个 CompletableFuture 或者它的子类将自动适配为异步调用, 否则为同步调用

public static InvokeType parse(String name) {
for (InvokeType s : values()) {
Expand All @@ -38,6 +38,6 @@ public static InvokeType parse(String name) {
}

public static InvokeType getDefault() {
return SYNC;
return AUTO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public GenericInvoker newProxyInstance() {
ClusterStrategyConfig strategyConfig = ClusterStrategyConfig.of(strategy, retries);
switch (invokeType) {
case SYNC:
case AUTO:
return new SyncGenericInvoker(client.appName(), metadata, dispatcher, strategyConfig, methodSpecialConfigs);
case ASYNC:
return new AsyncGenericInvoker(client.appName(), metadata, dispatcher, strategyConfig, methodSpecialConfigs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.jupiter.rpc.consumer.dispatcher.Dispatcher;
import org.jupiter.rpc.consumer.invoker.AsyncInvoker;
import org.jupiter.rpc.consumer.invoker.AutoInvoker;
import org.jupiter.rpc.consumer.invoker.SyncInvoker;
import org.jupiter.rpc.load.balance.LoadBalancerFactory;
import org.jupiter.rpc.load.balance.LoadBalancerType;
import org.jupiter.rpc.model.metadata.ClusterStrategyConfig;
Expand Down Expand Up @@ -248,14 +247,12 @@ public I newProxyInstance() {
Object handler;
switch (invokeType) {
case SYNC:
handler = new SyncInvoker(client.appName(), metadata, dispatcher, strategyConfig, methodSpecialConfigs);
case AUTO:
handler = new AutoInvoker(client.appName(), metadata, dispatcher, strategyConfig, methodSpecialConfigs);
break;
case ASYNC:
handler = new AsyncInvoker(client.appName(), metadata, dispatcher, strategyConfig, methodSpecialConfigs);
break;
case AUTO:
handler = new AutoInvoker(client.appName(), metadata, dispatcher, strategyConfig, methodSpecialConfigs);
break;
default:
throw reject("invokeType: " + invokeType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,26 @@ public AutoInvoker(String appName,
public Object invoke(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {
Class<?> returnType = method.getReturnType();

if (CompletableFuture.class.isAssignableFrom(returnType)) {
final CompletableFuture<Object> cf = createCompletableFuture((Class<CompletableFuture>) returnType);
if (!CompletableFuture.class.isAssignableFrom(returnType)) {
return doInvoke(method.getName(), args, returnType, true);
}

// Using nested future is for compatibility with InvokeType.SYNC,
// I don't think this leads to significant serialization performance loss.
InvokeFuture<CompletableFuture<Object>> ivf =
(InvokeFuture<CompletableFuture<Object>>) doInvoke(method.getName(), args, returnType, false);
final CompletableFuture<Object> cf = createCompletableFuture((Class<CompletableFuture>) returnType);

ivf.whenComplete((result, throwable) -> {
if (throwable == null) {
try {
cf.complete(result.get());
} catch (Throwable t) {
cf.completeExceptionally(t);
}
} else {
cf.completeExceptionally(throwable);
InvokeFuture<Object> inf = (InvokeFuture<Object>) doInvoke(method.getName(), args, returnType, false);
inf.whenComplete((result, throwable) -> {
if (throwable == null) {
try {
cf.complete(result);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
});

return cf;
}
} else {
cf.completeExceptionally(throwable);
}
});

return doInvoke(method.getName(), args, returnType, true);
return cf;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*
* @author jiachun.fjc
*/
@Deprecated
public class SyncInvoker extends AbstractInvoker {

public SyncInvoker(String appName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,23 @@ private void process(ServiceWrapper service) {
final Object invokeResult = Chains.invoke(request, invokeCtx)
.getResult();

if (invokeResult instanceof CompletableFuture) {
CompletableFuture<Object> cf = (CompletableFuture<Object>) invokeResult;
cf.whenComplete((result, throwable) -> {
if (throwable == null) {
try {
doProcess(invokeResult);
} catch (Throwable t) {
handleFail(invokeCtx, t);
}
} else {
handleFail(invokeCtx, throwable);
}
});
if (!(invokeResult instanceof CompletableFuture)) {
doProcess(invokeResult);
return;
}

doProcess(invokeResult);
CompletableFuture<Object> cf = (CompletableFuture<Object>) invokeResult;
cf.whenComplete((result, throwable) -> {
if (throwable == null) {
try {
doProcess(result);
} catch (Throwable t) {
handleFail(invokeCtx, t);
}
} else {
handleFail(invokeCtx, throwable);
}
});
} catch (Throwable t) {
handleFail(invokeCtx, t);
}
Expand Down

0 comments on commit ab5a6fe

Please sign in to comment.