Skip to content

Commit 09fc238

Browse files
authored
Merge pull request #1029 from JellyBo/registry-etcd-jelly
[ISSUE #1028] support Etcd Registry
2 parents afab36b + 7672959 commit 09fc238

File tree

26 files changed

+876
-36
lines changed

26 files changed

+876
-36
lines changed

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,10 +473,10 @@ subprojects {
473473
dependency "io.cloudevents:cloudevents-core:2.2.0"
474474
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"
475475

476-
dependency "io.grpc:grpc-protobuf:1.15.0"
477-
dependency "io.grpc:grpc-stub:1.15.0"
478-
dependency "io.grpc:grpc-netty:1.15.0"
479-
dependency "io.grpc:grpc-netty-shaded:1.15.0"
476+
dependency "io.grpc:grpc-protobuf:1.17.1"
477+
dependency "io.grpc:grpc-stub:1.17.1"
478+
dependency "io.grpc:grpc-netty:1.17.1"
479+
dependency "io.grpc:grpc-netty-shaded:1.17.1"
480480

481481
dependency "javax.annotation:javax.annotation-api:1.3.2"
482482

eventmesh-common/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ dependencies {
3939

4040
implementation "io.netty:netty-all"
4141

42-
implementation "io.grpc:grpc-protobuf:1.15.0"
43-
implementation "io.grpc:grpc-stub:1.15.0"
42+
implementation "io.grpc:grpc-protobuf:1.17.1"
43+
implementation "io.grpc:grpc-stub:1.17.1"
4444
implementation "javax.annotation:javax.annotation-api:1.3.2"
4545

4646
implementation "com.github.stefanbirkner:system-rules"

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/ConsumerServiceGrpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*/
3232
@SuppressWarnings({"all"})
3333
@javax.annotation.Generated(
34-
value = "by gRPC proto compiler (version 1.15.0)",
34+
value = "by gRPC proto compiler (version 1.17.1)",
3535
comments = "Source: eventmesh-client.proto")
3636
public final class ConsumerServiceGrpc {
3737

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/HeartbeatServiceGrpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*/
2929
@SuppressWarnings({"all"})
3030
@javax.annotation.Generated(
31-
value = "by gRPC proto compiler (version 1.15.0)",
31+
value = "by gRPC proto compiler (version 1.17.1)",
3232
comments = "Source: eventmesh-client.proto")
3333
public final class HeartbeatServiceGrpc {
3434

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/protos/PublisherServiceGrpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*/
2929
@SuppressWarnings({"all"})
3030
@javax.annotation.Generated(
31-
value = "by gRPC proto compiler (version 1.15.0)",
31+
value = "by gRPC proto compiler (version 1.17.1)",
3232
comments = "Source: eventmesh-client.proto")
3333
public final class PublisherServiceGrpc {
3434

eventmesh-examples/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
def grpcVersion = '1.15.0'
18+
def grpcVersion = '1.17.1'
1919

2020
configurations {
2121
implementation.exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'

eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ dependencies {
2020
implementation "io.cloudevents:cloudevents-core"
2121
implementation "com.google.guava:guava"
2222
implementation "io.cloudevents:cloudevents-json-jackson"
23-
implementation "io.grpc:grpc-protobuf:1.15.0"
23+
implementation "io.grpc:grpc-protobuf:1.17.1"
2424
}

eventmesh-protocol-plugin/eventmesh-protocol-grpc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ repositories {
2424
mavenCentral()
2525
}
2626

27-
def grpcVersion = '1.15.0' // CURRENT_GRPC_VERSION
27+
def grpcVersion = '1.17.1' // CURRENT_GRPC_VERSION
2828
def protobufVersion = '3.5.1'
2929
def protocVersion = protobufVersion
3030

eventmesh-protocol-plugin/eventmesh-protocol-http/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ dependencies {
2020
implementation "io.cloudevents:cloudevents-core"
2121
implementation "com.google.guava:guava"
2222
implementation "io.cloudevents:cloudevents-json-jackson"
23-
implementation "io.grpc:grpc-protobuf:1.15.0"
23+
implementation "io.grpc:grpc-protobuf:1.17.1"
2424
}

eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
dependencies {
1919
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
2020
implementation "io.cloudevents:cloudevents-core"
21-
implementation "io.grpc:grpc-protobuf:1.15.0"
21+
implementation "io.grpc:grpc-protobuf:1.17.1"
2222

2323
testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
2424
testImplementation "io.cloudevents:cloudevents-core"

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshDataInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class EventMeshDataInfo {
3030

3131
private Map<String, String> metadata;
3232

33+
public EventMeshDataInfo() {
34+
35+
}
36+
3337
public EventMeshDataInfo(String eventMeshClusterName, String eventMeshName, String endpoint, long lastUpdateTimestamp,
3438
Map<String, String> metadata) {
3539
this.eventMeshClusterName = eventMeshClusterName;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
dependencies {
19+
implementation ("io.etcd:jetcd-core:0.3.0")
20+
implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
21+
implementation project(":eventmesh-common")
22+
testImplementation "org.mockito:mockito-core"
23+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
pluginType=registry
19+
pluginName=etcd
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.registry.etcd.constant;
19+
20+
/**
21+
* EtcdConstant.
22+
*/
23+
public class EtcdConstant {
24+
25+
public static final String SERVER_ADDR = "serverAddr";
26+
27+
public static final String USERNAME = "username";
28+
29+
public static final String PASSWORD = "password";
30+
31+
public static final String KEY_SEPARATOR = "/";
32+
33+
public static final long TTL = 15L;
34+
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.registry.etcd.factory;
19+
20+
import org.apache.eventmesh.api.exception.RegistryException;
21+
import org.apache.eventmesh.common.Constants;
22+
import org.apache.eventmesh.registry.etcd.constant.EtcdConstant;
23+
24+
import org.apache.commons.lang3.StringUtils;
25+
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.Properties;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import io.etcd.jetcd.ByteSequence;
35+
import io.etcd.jetcd.Client;
36+
import io.etcd.jetcd.ClientBuilder;
37+
import io.etcd.jetcd.options.LeaseOption;
38+
39+
40+
public class EtcdClientFactory {
41+
42+
private static final Logger logger = LoggerFactory.getLogger(EtcdClientFactory.class);
43+
44+
private static final Map<String, EtcdLeaseId> etcdLeaseIdMap = new ConcurrentHashMap<>();
45+
46+
47+
public static Client createClient(Properties properties) {
48+
String serverAddr = properties.getProperty(EtcdConstant.SERVER_ADDR);
49+
String username = properties.getProperty(EtcdConstant.USERNAME);
50+
String password = properties.getProperty(EtcdConstant.PASSWORD);
51+
52+
EtcdLeaseId etcdLeaseId = etcdLeaseIdMap.get(serverAddr);
53+
if (Objects.nonNull(etcdLeaseId)) {
54+
return etcdLeaseId.getClientWrapper();
55+
}
56+
ClientBuilder clientBuilder = Client.builder();
57+
String[] addresses = serverAddr.split(",");
58+
String[] httpAddress = new String[addresses.length];
59+
for (int i = 0; i < addresses.length; i++) {
60+
if (!addresses[i].startsWith(Constants.HTTP_PROTOCOL_PREFIX)) {
61+
httpAddress[i] = Constants.HTTP_PROTOCOL_PREFIX + addresses[i];
62+
}
63+
}
64+
etcdLeaseId = new EtcdLeaseId();
65+
try {
66+
etcdLeaseId.setUrl(serverAddr);
67+
etcdLeaseId.setClientBuilder(clientBuilder.endpoints(httpAddress));
68+
if (StringUtils.isNoneBlank(username)) {
69+
etcdLeaseId.getClientBuilder().user(ByteSequence.from(username.getBytes()));
70+
}
71+
if (StringUtils.isNoneBlank(password)) {
72+
etcdLeaseId.getClientBuilder().password(ByteSequence.from(password.getBytes()));
73+
}
74+
etcdLeaseId.setClientWrapper(new EtcdClientWrapper(etcdLeaseId.getClientBuilder().build()));
75+
EtcdClientWrapper client = etcdLeaseId.getClientWrapper();
76+
long leaseId = client.getLeaseClient().grant(EtcdConstant.TTL).get().getID();
77+
etcdLeaseId.setLeaseId(leaseId);
78+
EtcdStreamObserver etcdStreamObserver = new EtcdStreamObserver();
79+
etcdStreamObserver.setEtcdLeaseId(etcdLeaseId);
80+
etcdLeaseId.setEtcdStreamObserver(etcdStreamObserver);
81+
client.getLeaseClient().keepAlive(leaseId, etcdStreamObserver);
82+
83+
etcdLeaseIdMap.put(serverAddr, etcdLeaseId);
84+
} catch (Throwable e) {
85+
logger.error("createClient failed, address: {}", serverAddr, e);
86+
throw new RegistryException("createClient failed", e);
87+
}
88+
return etcdLeaseId.getClientWrapper();
89+
}
90+
91+
92+
public static void renewalLeaseId(EtcdLeaseId etcdLeaseId) {
93+
logger.info("renewal of contract. server url: {}", etcdLeaseId.getUrl());
94+
Client client = etcdLeaseId.getClientWrapper();
95+
try {
96+
long ttl = client.getLeaseClient().timeToLive(etcdLeaseId.getLeaseId(), LeaseOption.DEFAULT).get().getTTl();
97+
if (ttl < 1) {
98+
long leaseId = client.getLeaseClient().grant(EtcdConstant.TTL).get().getID();
99+
client.getLeaseClient().keepAlive(leaseId, etcdLeaseId.getEtcdStreamObserver());
100+
etcdLeaseId.setLeaseId(leaseId);
101+
}
102+
} catch (Throwable e) {
103+
logger.error("renewal error, server url: {}", etcdLeaseId.getUrl(), e);
104+
client.getLeaseClient().keepAlive(System.currentTimeMillis(), etcdLeaseId.getEtcdStreamObserver());
105+
}
106+
}
107+
108+
public static Long getLeaseId(String url) {
109+
return getEtcdLeaseId(url).getLeaseId();
110+
}
111+
112+
public static EtcdLeaseId getEtcdLeaseId(String url) {
113+
return etcdLeaseIdMap.get(url);
114+
}
115+
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.registry.etcd.factory;
19+
20+
21+
import io.etcd.jetcd.Auth;
22+
import io.etcd.jetcd.Client;
23+
import io.etcd.jetcd.Cluster;
24+
import io.etcd.jetcd.KV;
25+
import io.etcd.jetcd.Lease;
26+
import io.etcd.jetcd.Lock;
27+
import io.etcd.jetcd.Maintenance;
28+
import io.etcd.jetcd.Watch;
29+
30+
31+
class EtcdClientWrapper implements Client {
32+
33+
private volatile Client client;
34+
35+
public EtcdClientWrapper(Client client) {
36+
this.client = client;
37+
}
38+
39+
@Override
40+
public Auth getAuthClient() {
41+
return client.getAuthClient();
42+
}
43+
44+
@Override
45+
public KV getKVClient() {
46+
return client.getKVClient();
47+
}
48+
49+
@Override
50+
public Cluster getClusterClient() {
51+
return client.getClusterClient();
52+
}
53+
54+
@Override
55+
public Maintenance getMaintenanceClient() {
56+
return client.getMaintenanceClient();
57+
}
58+
59+
@Override
60+
public Lease getLeaseClient() {
61+
return client.getLeaseClient();
62+
}
63+
64+
@Override
65+
public Watch getWatchClient() {
66+
return client.getWatchClient();
67+
}
68+
69+
@Override
70+
public Lock getLockClient() {
71+
return client.getLockClient();
72+
}
73+
74+
@Override
75+
public void close() {
76+
client.close();
77+
}
78+
79+
}

0 commit comments

Comments
 (0)