Skip to content

Commit c7c24d0

Browse files
committed
compelet kk registry integration
1 parent 04090c9 commit c7c24d0

File tree

36 files changed

+1404
-269
lines changed

36 files changed

+1404
-269
lines changed

04fx/spring01/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
<properties>
1212
<!-- <spring-version>5.2.7.RELEASE</spring-version>-->
13-
<spring-version>4.3.29.RELEASE</spring-version>
13+
<spring-version>4.3.30.RELEASE</spring-version>
1414
</properties>
1515

1616
<build>
@@ -19,8 +19,8 @@
1919
<groupId>org.apache.maven.plugins</groupId>
2020
<artifactId>maven-compiler-plugin</artifactId>
2121
<configuration>
22-
<source>8</source>
23-
<target>8</target>
22+
<source>11</source>
23+
<target>11</target>
2424
</configuration>
2525
</plugin>
2626
</plugins>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.kimmking.spring02;
2+
3+
import org.springframework.cglib.proxy.Enhancer;
4+
import org.springframework.cglib.proxy.MethodInterceptor;
5+
import org.springframework.cglib.proxy.MethodProxy;
6+
7+
import java.lang.reflect.Method;
8+
9+
/**
10+
* Description for this class.
11+
*
12+
* @Author : kimmking(kimmking@apache.org)
13+
* @create 2024/1/22 18:01
14+
*/
15+
public class SpringDemo11 {
16+
17+
public static void main(String[] args) {
18+
long s = System.currentTimeMillis();
19+
Enhancer enhancer = new Enhancer();
20+
enhancer.setInterfaces(new Class[]{IAction.class});
21+
enhancer.setCallback(new MI());
22+
enhancer.setUseCache(true);
23+
IAction demo = (IAction) enhancer.create();
24+
for (int i = 0; i < 5; i++) {
25+
long ss = System.currentTimeMillis();
26+
System.out.println(demo.action());
27+
System.out.println( i + " *****====> invoke proxy " + (System.currentTimeMillis() - ss) + " ms");
28+
}
29+
System.out.println(" *****====> enhancer proxy " + (System.currentTimeMillis() - s) + " ms");
30+
31+
}
32+
33+
public interface IAction {
34+
Object action();
35+
}
36+
37+
38+
static class MI implements MethodInterceptor {
39+
@Override
40+
public Object intercept(Object obj, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
41+
long s = System.currentTimeMillis();
42+
System.out.println(" *****==MI==> " + s + " " +"Before:"+method.getName());
43+
Object result = "S-" + s;//methodProxy.invokeSuper(obj, objects);
44+
System.out.println(" *****==MI==> " + (System.currentTimeMillis() - s) + " ms After:"+method.getName());
45+
return result;
46+
}
47+
}
48+
49+
}

07rpc/rpc01/rpcfx-core/pom.xml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<dependency>
2222
<groupId>com.alibaba</groupId>
2323
<artifactId>fastjson</artifactId>
24-
<version>1.2.70</version>
24+
<version>1.2.83</version>
2525
</dependency>
2626

2727
<dependency>
@@ -45,7 +45,7 @@
4545

4646
<dependency>
4747
<groupId>org.apache.curator</groupId>
48-
<artifactId>curator-framework</artifactId>
48+
<artifactId>curator-recipes</artifactId>
4949
<version>5.1.0</version>
5050
</dependency>
5151

@@ -70,11 +70,7 @@
7070
</exclusion>
7171
</exclusions>
7272
</dependency>
73-
<dependency>
74-
<groupId>org.apache.curator</groupId>
75-
<artifactId>curator-recipes</artifactId>
76-
<version>5.1.0</version>
77-
</dependency>
73+
7874
</dependencies>
7975

8076
</project>

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
public interface Filter {
44

5-
boolean filter(RpcfxRequest request);
5+
RpcfxResponse prefilter(RpcfxRequest request);
6+
7+
RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response);
68

79
// Filter next();
810

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.kimmking.rpcfx.api;
22

3+
import io.kimmking.rpcfx.meta.InstanceMeta;
4+
35
import java.util.List;
46

57
public interface LoadBalancer {
68

7-
String select(List<String> urls);
9+
InstanceMeta select(List<InstanceMeta> instances);
810

911
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.kimmking.rpcfx.api;
22

3+
import io.kimmking.rpcfx.meta.InstanceMeta;
4+
35
import java.util.List;
46

57
public interface Router {
68

7-
List<String> route(List<String> urls);
9+
List<InstanceMeta> route(List<InstanceMeta> instances);
810
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.kimmking.rpcfx.meta.ProviderMeta;
44
import lombok.Getter;
5+
import lombok.Setter;
56
import org.springframework.util.LinkedMultiValueMap;
67
import org.springframework.util.MultiValueMap;
78

@@ -17,9 +18,24 @@
1718
public class RpcContext {
1819

1920
@Getter
20-
private MultiValueMap<String, ProviderMeta> providerHolder = new LinkedMultiValueMap<>();
21+
private final MultiValueMap<String, ProviderMeta> providerHolder = new LinkedMultiValueMap<>();
2122

2223
@Getter
23-
private Map<String, Object> consumerHolder = new HashMap<>();
24+
private final Map<String, Object> consumerHolder = new HashMap<>();
25+
26+
@Getter
27+
private final Map<String, String> parameters = new HashMap<>();
28+
29+
@Getter
30+
@Setter
31+
private Router router;
32+
33+
@Getter
34+
@Setter
35+
private LoadBalancer loadBalancer;
36+
37+
@Getter
38+
@Setter
39+
private Filter[] filters;
2440

2541
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55
@Data
66
public class RpcfxRequest {
77
private String serviceClass;
8-
private String method;
8+
private String methodSign;
99
private Object[] params;
1010
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,24 @@
22

33
import io.kimmking.rpcfx.annotation.RpcfxReference;
44
import io.kimmking.rpcfx.api.RpcContext;
5+
import io.kimmking.rpcfx.meta.ServiceMeta;
6+
import io.kimmking.rpcfx.registry.RegistryCenter;
7+
import io.kimmking.rpcfx.registry.RegistryConfiguration;
58
import io.kimmking.rpcfx.stub.StubSkeletonHelper;
69
import lombok.extern.slf4j.Slf4j;
710
import org.springframework.beans.BeansException;
811
import org.springframework.beans.PropertyValues;
12+
import org.springframework.beans.factory.annotation.Autowired;
13+
import org.springframework.beans.factory.annotation.Value;
914
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
15+
import org.springframework.context.annotation.Import;
1016
import org.springframework.stereotype.Component;
1117

18+
import javax.annotation.PostConstruct;
1219
import java.io.Closeable;
1320
import java.io.IOException;
1421
import java.lang.reflect.Field;
22+
import java.util.ArrayList;
1523
import java.util.Arrays;
1624
import java.util.List;
1725
import java.util.stream.Collectors;
@@ -24,12 +32,39 @@
2432
*/
2533
@Slf4j
2634
@Component
35+
@Import({RegistryConfiguration.class})
2736
public class ConsumerBootstrap implements Closeable, InstantiationAwareBeanPostProcessor {
2837

29-
private RpcContext rpcContext = new RpcContext();
38+
private RpcContext context = new RpcContext();
3039

3140
private String scanPackage = "io.kimmking";
3241

42+
@Value("${app.id:app1}")
43+
public String app;
44+
@Value("${app.namespace:public}")
45+
public String ns;
46+
@Value("${app.env:dev}")
47+
public String env;
48+
@Value("${app.mock:false}")
49+
public boolean mock;
50+
@Value("${app.cache:false}")
51+
public boolean cache;
52+
@Value("${app.retry:1}")
53+
public int retry;
54+
55+
@Autowired
56+
RegistryCenter rc;
57+
58+
@PostConstruct
59+
public void init() {
60+
this.context.getParameters().put("app.id", app);
61+
this.context.getParameters().put("app.namespace", ns);
62+
this.context.getParameters().put("app.env", env);
63+
this.context.getParameters().put("app.mock", String.valueOf(mock));
64+
this.context.getParameters().put("app.cache", String.valueOf(cache));
65+
this.context.getParameters().put("app.retry", String.valueOf(retry));
66+
}
67+
3368
@Override
3469
public void close() throws IOException {
3570

@@ -38,14 +73,17 @@ public void close() throws IOException {
3873
@Override
3974
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
4075
if (bean.getClass().getPackage().getName().startsWith(scanPackage)) {
41-
Field[] declaredFields = bean.getClass().getDeclaredFields();
42-
List<Field> consumers = Arrays.stream(declaredFields).filter(field -> field.isAnnotationPresent(RpcfxReference.class)).collect(Collectors.toList());
76+
Field[] declaredFields = resolveAllField(bean.getClass()); // 解决父类里的注解扫描不到的问题
4377

44-
consumers.stream().forEach(consumer -> {
45-
Object consumer1 = createConsumer(consumer.getType());
78+
List<Field> consumers = Arrays.stream(declaredFields)
79+
.filter(field -> field.isAnnotationPresent(RpcfxReference.class))
80+
.collect(Collectors.toList());
81+
82+
consumers.stream().forEach(field -> {
83+
Object consumer = createConsumer(field.getType());
4684
try {
47-
consumer.setAccessible(true);
48-
consumer.set(bean, consumer1);
85+
field.setAccessible(true);
86+
field.set(bean, consumer);
4987
} catch (IllegalAccessException e) {
5088
log.error(e.getMessage(), e);
5189
}
@@ -54,7 +92,19 @@ public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, Str
5492
return null;
5593
}
5694

95+
private Field[] resolveAllField(Class<?> aClass) {
96+
List<Field> res = new ArrayList<>(20);
97+
while ( !Object.class.equals(aClass) ) {
98+
Field[] fields = aClass.getDeclaredFields();
99+
res.addAll(Arrays.asList(fields));
100+
aClass = aClass.getSuperclass();
101+
}
102+
return res.toArray(new Field[0]);
103+
}
104+
57105
private <T> T createConsumer(Class<T> clazz) {
58-
return StubSkeletonHelper.createConsumer(clazz, rpcContext);
106+
ServiceMeta sm = ServiceMeta.builder().name(clazz.getCanonicalName())
107+
.app(app).namespace(ns).env(env).build();
108+
return (T) StubSkeletonHelper.createConsumer(sm, context, rc);
59109
}
60110
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.kimmking.rpcfx.consumer;
2+
3+
4+
import com.alibaba.fastjson.parser.ParserConfig;
5+
import io.kimmking.rpcfx.api.*;
6+
import io.kimmking.rpcfx.meta.InstanceMeta;
7+
import io.kimmking.rpcfx.meta.ServiceMeta;
8+
import io.kimmking.rpcfx.registry.RegistryCenter;
9+
10+
import java.lang.reflect.Proxy;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public final class RpcfxConsumerInvoker {
15+
16+
static {
17+
ParserConfig.getGlobalInstance().addAccept("io.kimmking");
18+
}
19+
20+
RpcContext ctx;
21+
22+
RegistryCenter rc;
23+
24+
public RpcfxConsumerInvoker(RpcContext ctx, RegistryCenter rc) {
25+
this.ctx = ctx;
26+
this.rc = rc; //"localhost:2181"
27+
}
28+
29+
public void start() {
30+
this.rc.start();
31+
}
32+
33+
public void stop() {
34+
this.rc.stop();
35+
}
36+
37+
public <T> T createFromRegistry(final ServiceMeta sm, RpcContext ctx) {
38+
39+
String service = sm.getName();//"io.kimking.rpcfx.demo.api.UserService";
40+
System.out.println("====> "+service);
41+
List<InstanceMeta> invokers = new ArrayList<>();
42+
Class<?> serviceClass = null;
43+
try {
44+
45+
serviceClass = Class.forName(service);
46+
47+
List<InstanceMeta> insts = rc.fetchInstances(sm);
48+
if(insts != null && insts.size()>0) invokers.addAll(insts);
49+
rc.subscribe(sm, e -> {
50+
invokers.clear();
51+
invokers.addAll((List<InstanceMeta>)e.getData());
52+
});
53+
54+
} catch (Exception ex) {
55+
ex.printStackTrace();
56+
throw new RuntimeException(ex);
57+
}
58+
59+
return (T) create(serviceClass, invokers, ctx);
60+
61+
}
62+
63+
private <T> T create(Class<T> serviceClass, List<InstanceMeta> invokers, RpcContext ctx) {
64+
RpcfxInvocationHandler invocationHandler
65+
= new RpcfxInvocationHandler(serviceClass, invokers, ctx);
66+
return (T) Proxy.newProxyInstance(RpcfxConsumerInvoker.class.getClassLoader(),
67+
new Class[]{serviceClass}, invocationHandler);
68+
}
69+
70+
}

0 commit comments

Comments
 (0)