Skip to content

Commit 04090c9

Browse files
committed
refactor RPC and Registry
1 parent 42fe0c8 commit 04090c9

File tree

20 files changed

+540
-231
lines changed

20 files changed

+540
-231
lines changed

07rpc/rpc01/client-rest.http

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
http://127.0.0.1:8080/api/hello

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxReference.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
*/
1111
@Documented
1212
@Retention(RetentionPolicy.RUNTIME)
13-
@Target(ElementType.TYPE)
13+
@Target(ElementType.FIELD)
1414
@Inherited
1515
public @interface RpcfxReference {
1616

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.kimmking.rpcfx.api;
2+
3+
import io.kimmking.rpcfx.meta.ProviderMeta;
4+
import lombok.Getter;
5+
import org.springframework.util.LinkedMultiValueMap;
6+
import org.springframework.util.MultiValueMap;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
/**
12+
* Description for this class.
13+
*
14+
* @Author : kimmking(kimmking@apache.org)
15+
* @create 2024/1/13 20:34
16+
*/
17+
public class RpcContext {
18+
19+
@Getter
20+
private MultiValueMap<String, ProviderMeta> providerHolder = new LinkedMultiValueMap<>();
21+
22+
@Getter
23+
private Map<String, Object> consumerHolder = new HashMap<>();
24+
25+
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java

Lines changed: 0 additions & 7 deletions
This file was deleted.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.kimmking.rpcfx.consumer;
2+
3+
import io.kimmking.rpcfx.annotation.RpcfxReference;
4+
import io.kimmking.rpcfx.api.RpcContext;
5+
import io.kimmking.rpcfx.stub.StubSkeletonHelper;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.beans.BeansException;
8+
import org.springframework.beans.PropertyValues;
9+
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
10+
import org.springframework.stereotype.Component;
11+
12+
import java.io.Closeable;
13+
import java.io.IOException;
14+
import java.lang.reflect.Field;
15+
import java.util.Arrays;
16+
import java.util.List;
17+
import java.util.stream.Collectors;
18+
19+
/**
20+
* Description for this class.
21+
*
22+
* @Author : kimmking(kimmking@apache.org)
23+
* @create 2024/1/13 23:26
24+
*/
25+
@Slf4j
26+
@Component
27+
public class ConsumerBootstrap implements Closeable, InstantiationAwareBeanPostProcessor {
28+
29+
private RpcContext rpcContext = new RpcContext();
30+
31+
private String scanPackage = "io.kimmking";
32+
33+
@Override
34+
public void close() throws IOException {
35+
36+
}
37+
38+
@Override
39+
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
40+
if (bean.getClass().getPackage().getName().startsWith(scanPackage)) {
41+
Field[] declaredFields = bean.getClass().getDeclaredFields();
42+
List<Field> consumers = Arrays.stream(declaredFields).filter(field -> field.isAnnotationPresent(RpcfxReference.class)).collect(Collectors.toList());
43+
44+
consumers.stream().forEach(consumer -> {
45+
Object consumer1 = createConsumer(consumer.getType());
46+
try {
47+
consumer.setAccessible(true);
48+
consumer.set(bean, consumer1);
49+
} catch (IllegalAccessException e) {
50+
log.error(e.getMessage(), e);
51+
}
52+
});
53+
}
54+
return null;
55+
}
56+
57+
private <T> T createConsumer(Class<T> clazz) {
58+
return StubSkeletonHelper.createConsumer(clazz, rpcContext);
59+
}
60+
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/RpcfxInvocationHandler.java renamed to 07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package io.kimmking.rpcfx.client;
1+
package io.kimmking.rpcfx.consumer;
22

33
import com.alibaba.fastjson.JSON;
44
import io.kimmking.rpcfx.api.*;
5+
import io.kimmking.rpcfx.stub.StubSkeletonHelper;
56
import okhttp3.*;
67

78
import java.io.IOException;
@@ -35,6 +36,10 @@ public <T> RpcfxInvocationHandler(Class<T> serviceClass, List<String> invokers,
3536
@Override
3637
public Object invoke(Object proxy, Method method, Object[] params) throws Throwable {
3738

39+
if (!StubSkeletonHelper.checkRpcMethod(method)){
40+
return null ;
41+
}
42+
3843
List<String> urls = router.route(invokers);
3944
// System.out.println("router.route => ");
4045
// urls.forEach(System.out::println);
@@ -85,9 +90,6 @@ private RpcfxResponse post(RpcfxRequest req, String url) throws IOException {
8590
String reqJson = JSON.toJSONString(req);
8691
// System.out.println("req json: "+reqJson);
8792

88-
// 1.可以复用client
89-
// 2.尝试使用httpclient或者netty client
90-
9193
final Request request = new Request.Builder()
9294
.url(url)
9395
.post(RequestBody.create(JSONTYPE, reqJson))
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,50 @@
1-
package io.kimmking.rpcfx.client;
1+
package io.kimmking.rpcfx.consumer;
22

33

4-
import com.alibaba.fastjson.JSON;
54
import com.alibaba.fastjson.parser.ParserConfig;
65
import io.kimmking.rpcfx.api.*;
7-
import okhttp3.MediaType;
8-
import okhttp3.OkHttpClient;
9-
import okhttp3.Request;
10-
import okhttp3.RequestBody;
116
import org.apache.curator.RetryPolicy;
127
import org.apache.curator.framework.CuratorFramework;
138
import org.apache.curator.framework.CuratorFrameworkFactory;
149
import org.apache.curator.framework.recipes.cache.TreeCache;
1510
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
1611
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
1712
import org.apache.curator.retry.ExponentialBackoffRetry;
18-
import org.apache.zookeeper.CreateMode;
1913

20-
import java.io.IOException;
21-
import java.lang.reflect.InvocationHandler;
22-
import java.lang.reflect.Method;
2314
import java.lang.reflect.Proxy;
24-
import java.net.InetAddress;
2515
import java.util.ArrayList;
2616
import java.util.List;
2717

28-
public final class Rpcfx {
18+
public final class RpcfxInvoker {
2919

3020
static {
3121
ParserConfig.getGlobalInstance().addAccept("io.kimmking");
3222
}
23+
CuratorFramework client;
24+
String zkUrl = null;
3325

34-
public static <T, filters> T createFromRegistry(final Class<T> serviceClass, final String zkUrl, Router router, LoadBalancer loadBalance, Filter filter) {
26+
public RpcfxInvoker(String zkUrl) {
27+
this.zkUrl = zkUrl; //"localhost:2181"
28+
this.start();
29+
}
30+
31+
public void start() {
32+
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
33+
client = CuratorFrameworkFactory.builder().connectString(this.zkUrl).namespace("rpcfx").retryPolicy(retryPolicy).build();
34+
client.start();
35+
}
36+
37+
public void stop() {
38+
client.close();
39+
}
3540

36-
// 加filte之一
41+
public <T> T createFromRegistry(final Class<T> serviceClass, Router router, LoadBalancer loadBalance, Filter filter) {
3742

3843
String service = serviceClass.getCanonicalName();//"io.kimking.rpcfx.demo.api.UserService";
3944
System.out.println("====> "+service);
4045
List<String> invokers = new ArrayList<>();
4146

42-
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
43-
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181").namespace("rpcfx").retryPolicy(retryPolicy).build();
44-
client.start();
45-
4647
try {
47-
// ServiceProviderDesc userServiceSesc = ServiceProviderDesc.builder()
48-
// .host(InetAddress.getLocalHost().getHostAddress())
49-
// .port(8082).serviceClass(service).build();
50-
// String userServiceSescJson = JSON.toJSONString(userServiceSesc);
51-
5248

5349
if ( null == client.checkExists().forPath("/" + service)) {
5450
return null;
@@ -69,25 +65,13 @@ public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCac
6965
ex.printStackTrace();
7066
}
7167

72-
//
73-
//
74-
// // register service
75-
// // xxx "io.kimmking.rpcfx.demo.api.UserService"
76-
//
77-
78-
79-
// curator Provider list from zk
80-
81-
// 1. 简单:从zk拿到服务提供的列表
82-
// 2. 挑战:监听zk的临时节点,根据事件更新这个list(注意,需要做个全局map保持每个服务的提供者List)
83-
8468
return (T) create(serviceClass, invokers, router, loadBalance, filter);
8569

8670
}
8771

8872

8973

90-
private static void fetchInvokers(CuratorFramework client, String service, List<String> invokers) throws Exception {
74+
private void fetchInvokers(CuratorFramework client, String service, List<String> invokers) throws Exception {
9175
List<String> services = client.getChildren().forPath("/" + service);
9276
invokers.clear();
9377
for (String svc : services) {
@@ -97,10 +81,11 @@ private static void fetchInvokers(CuratorFramework client, String service, List<
9781
}
9882
}
9983

100-
private static <T> Object create(Class<T> serviceClass, List<String> invokers, Router router, LoadBalancer loadBalance, Filter... filters) {
101-
// 0. 替换动态代理 -> 字节码生成
102-
return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass},
103-
new RpcfxInvocationHandler(serviceClass, invokers, router, loadBalance, filters));
84+
private <T> T create(Class<T> serviceClass, List<String> invokers, Router router, LoadBalancer loadBalance, Filter... filters) {
85+
RpcfxInvocationHandler invocationHandler
86+
= new RpcfxInvocationHandler(serviceClass, invokers, router, loadBalance, filters);
87+
return (T) Proxy.newProxyInstance(RpcfxInvoker.class.getClassLoader(),
88+
new Class[]{serviceClass}, invocationHandler);
10489
}
10590

10691
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.kimmking.rpcfx.discovery;
2+
3+
/**
4+
* Description for this class.
5+
*
6+
* @Author : kimmking(kimmking@apache.org)
7+
* @create 2024/1/13 20:17
8+
*/
9+
public class DiscoveryClient {
10+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.kimmking.rpcfx.meta;
2+
3+
import lombok.Data;
4+
5+
import java.lang.reflect.Method;
6+
7+
/**
8+
* @author lirui
9+
*/
10+
@Data
11+
public class ProviderMeta {
12+
13+
private Object serviceImpl;
14+
15+
private Method method;
16+
17+
private String methodSign;
18+
19+
}

0 commit comments

Comments
 (0)