3
3
import com .alibaba .fastjson .JSON ;
4
4
import com .alibaba .fastjson .TypeReference ;
5
5
import io .kimmking .rpcfx .meta .InstanceMeta ;
6
+ import io .kimmking .rpcfx .meta .ServerMeta ;
6
7
import io .kimmking .rpcfx .meta .ServiceMeta ;
7
8
import io .kimmking .rpcfx .registry .ChangedListener ;
8
9
import io .kimmking .rpcfx .registry .Event ;
9
10
import io .kimmking .rpcfx .registry .RegistryCenter ;
11
+ import lombok .SneakyThrows ;
10
12
import okhttp3 .ConnectionPool ;
11
13
import okhttp3 .OkHttpClient ;
12
14
import okhttp3 .Request ;
13
15
import okhttp3 .RequestBody ;
14
16
17
+ import java .io .IOException ;
15
18
import java .util .HashMap ;
16
19
import java .util .List ;
17
20
import java .util .Map ;
21
+ import java .util .Random ;
18
22
import java .util .concurrent .TimeUnit ;
19
23
20
24
import static io .kimmking .rpcfx .consumer .RpcfxInvocationHandler .JSONTYPE ;
27
31
*/
28
32
public class KKRegistryCenter implements RegistryCenter {
29
33
34
+ public String RC_Server = "http://localhost:8485" ;
35
+ private ServerMeta leader ;
36
+ private List <ServerMeta > servers ;
30
37
private Map <String , Long > TV = new HashMap <>();
31
38
32
39
OkHttpClient client ;
40
+ @ SneakyThrows
33
41
@ Override
34
42
public void start () {
35
43
client = new OkHttpClient .Builder ()
@@ -39,6 +47,40 @@ public void start() {
39
47
.writeTimeout (65 , TimeUnit .SECONDS )
40
48
.connectTimeout (3 , TimeUnit .SECONDS )
41
49
.build ();
50
+
51
+ String url = RC_Server + "/cluster" ;
52
+ boolean init = false ;
53
+ while (!init ) {
54
+ System .out .println ("===============>> cluster info from :" + url );
55
+ List <ServerMeta > new_servers = null ;
56
+ ServerMeta new_leader = null ;
57
+ try {
58
+ String respJson = get (url );
59
+ new_servers = JSON .parseObject (respJson , new TypeReference <List <ServerMeta >>() {
60
+ });
61
+ new_leader = new_servers .stream ().filter (ServerMeta ::isStatus )
62
+ .filter (ServerMeta ::isLeader ).findFirst ().orElse (null );
63
+ } catch (Exception exception ) {
64
+ exception .printStackTrace ();
65
+ }
66
+
67
+ if (new_leader == null ) {
68
+ System .out .println ("===============>> no leader, 500ms later and retry." );
69
+ Thread .sleep (500 );
70
+ Random random = new Random ();
71
+ if (new_servers !=null && new_servers .size () > 1 ) {
72
+ url = new_servers .get (random .nextInt (new_servers .size ())).getUrl () + "/cluster" ;
73
+ } else if ((new_servers ==null || new_servers .isEmpty ()) && !servers .isEmpty ()) {
74
+ url = servers .get (random .nextInt (servers .size ())).getUrl () + "/cluster" ;
75
+ }
76
+ } else {
77
+ this .servers = new_servers ;
78
+ this .leader = new_leader ;
79
+ init = true ;
80
+ System .out .println ("===============>> init ok, new_leader = " + new_leader );
81
+ System .out .println ("===============>> init ok, new_servers = " + new_servers );
82
+ }
83
+ }
42
84
}
43
85
44
86
@ Override
@@ -50,14 +92,8 @@ public void stop() {
50
92
public void registerService (ServiceMeta service , InstanceMeta instance ) throws Exception {
51
93
instance .setStatus (true );
52
94
String reqJson = JSON .toJSONString (instance );
53
- String url = "http://localhost:8484/reg?service=" + service ;
54
- System .out .println (" ====> reg service: " + url );
55
- final Request request = new Request .Builder ()
56
- .url (url )
57
- .post (RequestBody .create (JSONTYPE , reqJson ))
58
- .build ();
59
- String respJson = client .newCall (request ).execute ().body ().string ();
60
- System .out .println (" ====> reg response: " + respJson );
95
+ String url = leader .getUrl () + "/reg?service=" + service ;
96
+ post (url , reqJson );
61
97
// String reqJson = "{\n" +
62
98
// " \"scheme\": \"http\",\n" +
63
99
// " \"ip\": \"" + instance.getIp() + "\",\n" +
@@ -77,33 +113,43 @@ public void registerService(ServiceMeta service, InstanceMeta instance) throws E
77
113
// System.out.println(respJson);
78
114
}
79
115
80
- @ Override
81
- public void unregisterService (ServiceMeta service , InstanceMeta instance ) throws Exception {
82
- String reqJson = "{\n " +
83
- " \" scheme\" : \" http\" ,\n " +
84
- " \" host\" : \" " + instance .getHost () + "\" ,\n " +
85
- " \" port\" : \" " + instance .getPort () + "\" ,\n " +
86
- " \" context\" : \" \" \n " +
87
- "}" ;
88
- String url = "http://localhost:8484/unreg?service=" + service ;
89
- System .out .println (" ====> unreg service: " + url );
116
+ private String post (String url , String reqJson ) throws IOException {
117
+ System .out .println (" ====> request: " + url );
90
118
final Request request = new Request .Builder ()
91
119
.url (url )
92
120
.post (RequestBody .create (JSONTYPE , reqJson ))
93
121
.build ();
94
122
String respJson = client .newCall (request ).execute ().body ().string ();
95
- System .out .println (" ====> unreg response: " + respJson );
123
+ System .out .println (" ====> response: " + respJson );
124
+ return respJson ;
96
125
}
97
126
98
- public List <InstanceMeta > fetchInstances (ServiceMeta service ) throws Exception {
99
- String url = "http://localhost:8484/findAll?service=" + service ;
100
- System .out .println (" ====> fetchInstances: " + url );
127
+ private String get (String url ) throws IOException {
128
+ System .out .println (" ====> request: " + url );
101
129
final Request request = new Request .Builder ()
102
130
.url (url )
103
131
.get ()
104
132
.build ();
105
133
String respJson = client .newCall (request ).execute ().body ().string ();
106
- System .out .println (" ====> fetchInstances response: " + respJson );
134
+ System .out .println (" ====> response: " + respJson );
135
+ return respJson ;
136
+ }
137
+
138
+ @ Override
139
+ public void unregisterService (ServiceMeta service , InstanceMeta instance ) throws Exception {
140
+ String reqJson = "{\n " +
141
+ " \" scheme\" : \" http\" ,\n " +
142
+ " \" host\" : \" " + instance .getHost () + "\" ,\n " +
143
+ " \" port\" : \" " + instance .getPort () + "\" ,\n " +
144
+ " \" context\" : \" \" \n " +
145
+ "}" ;
146
+ String url = leader .getUrl () + "/unreg?service=" + service ;
147
+ post (url , reqJson );
148
+ }
149
+
150
+ public List <InstanceMeta > fetchInstances (ServiceMeta service ) throws Exception {
151
+ String url = RC_Server + "/findAll?service=" + service ;
152
+ String respJson = get (url );
107
153
List <InstanceMeta > instances = JSON .parseObject (respJson , new TypeReference <List <InstanceMeta >>() {
108
154
});
109
155
return instances ;
@@ -112,12 +158,11 @@ public List<InstanceMeta> fetchInstances(ServiceMeta service) throws Exception {
112
158
KKHeathChecker checker = new KKHeathChecker ();
113
159
114
160
// for Consumer
115
- public void subscribe (ServiceMeta service , final ChangedListener listener ) {
116
-
161
+ public void subscribe (ServiceMeta service , final ChangedListener <List <InstanceMeta >> listener ) {
117
162
checker .check ( () -> {
118
- if (hb (service , listener )) {
163
+ if (hb (service )) {
119
164
List <InstanceMeta > instances = fetchInstances (service );
120
- Event e = Event .withData (instances );
165
+ Event < List < InstanceMeta >> e = Event .withData (instances );
121
166
listener .fireEvent (e );
122
167
}
123
168
});
@@ -127,16 +172,10 @@ public void subscribe(ServiceMeta service, final ChangedListener listener) {
127
172
// 如果有差异就fire
128
173
}
129
174
130
- private boolean hb (ServiceMeta service , ChangedListener listener ) throws Exception {
175
+ private boolean hb (ServiceMeta service ) throws Exception {
131
176
String svc = service .toString ();
132
- String url = "http://localhost:8484/version?service=" + svc ;
133
- System .out .println (" ====> consumer version: " + url );
134
- final Request request = new Request .Builder ()
135
- .url (url )
136
- .get ()
137
- .build ();
138
- String respJson = client .newCall (request ).execute ().body ().string ();
139
- System .out .println (" ====> consumer version: " +respJson );
177
+ String url = RC_Server + "/version?service=" + svc ;
178
+ String respJson = get (url );
140
179
Long v = Long .valueOf (respJson );
141
180
Long o = TV .getOrDefault (svc , -1L );
142
181
if ( v > o ) {
@@ -162,14 +201,8 @@ Long heart(ServiceMeta service, InstanceMeta instance) throws Exception {
162
201
" \" context\" : \" \" ,\n " +
163
202
" \" status\" : true\n " +
164
203
"}" ;
165
- String url = "http://localhost:8484/renew?service=" + service ;
166
- System .out .println (" ====> provider renew: " + url );
167
- final Request request = new Request .Builder ()
168
- .url (url )
169
- .post (RequestBody .create (JSONTYPE , reqJson ))
170
- .build ();
171
- String respJson = client .newCall (request ).execute ().body ().string ();
172
- System .out .println (" ====> provider renew: " +respJson );
204
+ String url = leader .getUrl () + "/renew?service=" + service ;
205
+ String respJson = post (url , reqJson );
173
206
return Long .valueOf (respJson );
174
207
}
175
208
0 commit comments