Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions sentinel-extension/sentinel-datasource-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
<packaging>jar</packaging>

<properties>
<zookeeper.version>3.4.14</zookeeper.version>
<curator.version>4.0.1</curator.version>
<curator.test.version>2.12.0</curator.test.version>
<curator.version>5.1.0</curator.version>
</properties>

<dependencies>
Expand All @@ -24,21 +22,10 @@
<artifactId>sentinel-datasource-extension</artifactId>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -54,14 +41,8 @@
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.test.version}</version>
<version>${curator.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.Arrays;
Expand Down Expand Up @@ -40,11 +40,11 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update", true),
new ThreadPoolExecutor.DiscardOldestPolicy());

private NodeCacheListener listener;
private CuratorCacheListener listener;
private final String path;

private CuratorFramework zkClient = null;
private NodeCache nodeCache = null;
private CuratorCache nodeCache = null;

public ZookeeperDataSource(final String serverAddr, final String path, Converter<String, T> parser) {
super(parser);
Expand Down Expand Up @@ -104,21 +104,17 @@ private void loadInitialConfig() {
private void initZookeeperListener(final String serverAddr, final List<AuthInfo> authInfos) {
try {

this.listener = new NodeCacheListener() {
@Override
public void nodeChanged() {

try {
T newValue = loadConfig();
RecordLog.info("[ZookeeperDataSource] New property value received for ({}, {}): {}",
this.listener = CuratorCacheListener.builder().forNodeCache(() -> {
try {
T newValue = loadConfig();
RecordLog.info("[ZookeeperDataSource] New property value received for ({}, {}): {}",
serverAddr, path, newValue);
// Update the new value to the property.
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[ZookeeperDataSource] loadConfig exception", ex);
}
// Update the new value to the property.
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[ZookeeperDataSource] loadConfig exception", ex);
}
};
}).build();

String zkKey = getZkKey(serverAddr, authInfos);
if (zkClientMap.containsKey(zkKey)) {
Expand Down Expand Up @@ -148,8 +144,8 @@ public void nodeChanged() {
}
}

this.nodeCache = new NodeCache(this.zkClient, this.path);
this.nodeCache.getListenable().addListener(this.listener, this.pool);
this.nodeCache = CuratorCache.build(this.zkClient, this.path);
this.nodeCache.listenable().addListener(this.listener, this.pool);
this.nodeCache.start();
} catch (Exception e) {
RecordLog.warn("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e);
Expand All @@ -163,7 +159,7 @@ public String readSource() throws Exception {
throw new IllegalStateException("Zookeeper has not been initialized or error occurred");
}
String configInfo = null;
ChildData childData = nodeCache.getCurrentData();
ChildData childData = nodeCache.get(path).orElse(null);
if (null != childData && childData.getData() != null) {

configInfo = new String(childData.getData());
Expand All @@ -174,7 +170,7 @@ public String readSource() throws Exception {
@Override
public void close() throws Exception {
if (this.nodeCache != null) {
this.nodeCache.getListenable().removeListener(listener);
this.nodeCache.listenable().removeListener(listener);
this.nodeCache.close();
}
if (this.zkClient != null) {
Expand Down