diff --git a/pom.xml b/pom.xml index 6c2f6d1672..cff702d472 100755 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,11 @@ sentinel-datasource-nacos ${project.version} + + com.alibaba.csp + sentinel-datasource-zookeeper + ${project.version} + com.alibaba.csp sentinel-adapter diff --git a/sentinel-demo/pom.xml b/sentinel-demo/pom.xml index 8accce646c..d168be48cb 100755 --- a/sentinel-demo/pom.xml +++ b/sentinel-demo/pom.xml @@ -18,6 +18,7 @@ sentinel-demo-rocketmq sentinel-demo-dubbo sentinel-demo-nacos-datasource + sentinel-demo-zookeeper-datasource diff --git a/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml b/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml new file mode 100644 index 0000000000..2915c1d118 --- /dev/null +++ b/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml @@ -0,0 +1,73 @@ + + + + sentinel-demo + com.alibaba.csp + 0.1.1-SNAPSHOT + + 4.0.0 + + sentinel-demo-zookeeper-datasource + + + 3.4.13 + 4.0.1 + 2.12.0 + + + + + com.alibaba.csp + sentinel-core + + + com.alibaba.csp + sentinel-datasource-extension + + + com.alibaba.csp + sentinel-datasource-zookeeper + + + + com.alibaba + fastjson + + + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + + + + org.apache.curator + curator-test + ${curator-test.version} + + + org.apache.zookeeper + zookeeper + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven.compiler.version} + + 1.8 + 1.8 + ${java.encoding} + + + + + + \ No newline at end of file diff --git a/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java b/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java new file mode 100644 index 0000000000..1b635e61fe --- /dev/null +++ b/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java @@ -0,0 +1,75 @@ +package com.alibaba.csp.sentinel.demo.datasource.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; + +/** + * Zookeeper config sender for demo + * + * @author guonanjun + */ +public class ZookeeperConfigSender { + + private static final int RETRY_TIMES = 3; + private static final int SLEEP_TIME = 1000; + + public static void main(String[] args) throws Exception { + + // 启动Zookeeper服务 + TestingServer server = new TestingServer(2181); + + final String remoteAddress = server.getConnectString(); + final String groupId = "Sentinel-Demo"; + final String dataId = "SYSTEM-CODE-DEMO-FLOW"; + final String rule = "[\n" + + " {\n" + + " \"resource\": \"TestResource\",\n" + + " \"controlBehavior\": 0,\n" + + " \"count\": 10.0,\n" + + " \"grade\": 1,\n" + + " \"limitApp\": \"default\",\n" + + " \"strategy\": 0\n" + + " }\n" + + "]"; + + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); + zkClient.start(); + String path = getPath(groupId, dataId); + Stat stat = zkClient.checkExists().forPath(path); + if (stat == null) { + zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null); + } + zkClient.setData().forPath(path, rule.getBytes()); + // zkClient.delete().forPath(path); + + try { + Thread.sleep(30000L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + zkClient.close(); + + //停止zookeeper服务 + server.stop(); + } + + private static String getPath(String groupId, String dataId) { + String path = ""; + if (groupId.startsWith("/")) { + path += groupId; + } else { + path += "/" + groupId; + } + if (dataId.startsWith("/")) { + path += dataId; + } else { + path += "/" + dataId; + } + return path; + } +} diff --git a/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperDataSourceDemo.java b/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperDataSourceDemo.java new file mode 100644 index 0000000000..2077b7d834 --- /dev/null +++ b/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperDataSourceDemo.java @@ -0,0 +1,65 @@ +package com.alibaba.csp.sentinel.demo.datasource.zookeeper; + +import java.util.List; + +import com.alibaba.csp.sentinel.datasource.DataSource; +import com.alibaba.csp.sentinel.datasource.zookeeper.ZookeeperDataSource; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; + +/** + * Zookeeper DataSource Demo + * + * @author guonanjun + */ +public class ZookeeperDataSourceDemo { + + public static void main(String[] args) { + // 使用zookeeper的场景 + loadRules(); + + // 方便扩展的场景 + //loadRules2(); + } + + private static void loadRules() { + + final String remoteAddress = "127.0.0.1:2181"; + final String path = "/Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW"; + + DataSource> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, path, + source -> JSON.parseObject(source, new TypeReference>() {})); + FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); + + + } + + private static void loadRules2() { + + final String remoteAddress = "127.0.0.1:2181"; + // 引入groupId和dataId的概念,是为了方便和Nacos进行切换 + final String groupId = "Sentinel-Demo"; + final String flowDataId = "SYSTEM-CODE-DEMO-FLOW"; + // final String degradeDataId = "SYSTEM-CODE-DEMO-DEGRADE"; + // final String systemDataId = "SYSTEM-CODE-DEMO-SYSTEM"; + + + // 规则会持久化到zk的/groupId/flowDataId节点 + // groupId和和flowDataId可以用/开头也可以不用 + // 建议不用以/开头,目的是为了如果从Zookeeper切换到Nacos的话,只需要改数据源类名就可以 + DataSource> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, groupId, flowDataId, + source -> JSON.parseObject(source, new TypeReference>() {})); + FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); + + // DataSource> degradeRuleDataSource = new ZookeeperDataSource<>(remoteAddress, groupId, degradeDataId, + // source -> JSON.parseObject(source, new TypeReference>() {})); + // DegradeRuleManager.register2Property(degradeRuleDataSource.getProperty()); + // + // DataSource> systemRuleDataSource = new ZookeeperDataSource<>(remoteAddress, groupId, systemDataId, + // source -> JSON.parseObject(source, new TypeReference>() {})); + // SystemRuleManager.register2Property(systemRuleDataSource.getProperty()); + + } +} diff --git a/sentinel-extension/pom.xml b/sentinel-extension/pom.xml index ba9b729fa5..e72add499a 100755 --- a/sentinel-extension/pom.xml +++ b/sentinel-extension/pom.xml @@ -14,6 +14,7 @@ sentinel-datasource-extension sentinel-datasource-nacos + sentinel-datasource-zookeeper diff --git a/sentinel-extension/sentinel-datasource-zookeeper/pom.xml b/sentinel-extension/sentinel-datasource-zookeeper/pom.xml new file mode 100644 index 0000000000..6c7b2e21f3 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-zookeeper/pom.xml @@ -0,0 +1,43 @@ + + + + sentinel-extension + com.alibaba.csp + 0.1.1-SNAPSHOT + + 4.0.0 + + sentinel-datasource-zookeeper + jar + + + 3.4.13 + 4.0.1 + + + + + com.alibaba.csp + sentinel-datasource-extension + + + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.zookeeper + zookeeper + + + + + \ No newline at end of file diff --git a/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java b/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java new file mode 100644 index 0000000000..b8c2663d3f --- /dev/null +++ b/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java @@ -0,0 +1,162 @@ +package com.alibaba.csp.sentinel.datasource.zookeeper; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; +import com.alibaba.csp.sentinel.datasource.AbstractDataSource; +import com.alibaba.csp.sentinel.datasource.ConfigParser; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.StringUtil; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; + +/** + * Zookeeper DataSource + * + * @author guonanjun + */ +public class ZookeeperDataSource extends AbstractDataSource { + + private static final int RETRY_TIMES = 3; + private static final int SLEEP_TIME = 1000; + + private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + + private NodeCacheListener listener; + private final String groupId; + private final String dataId; + private final String path; + + private CuratorFramework zkClient = null; + private NodeCache nodeCache = null; + + public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId, + ConfigParser parser) { + super(parser); + if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { + throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", + serverAddr, groupId, dataId)); + } + this.groupId = groupId; + this.dataId = dataId; + this.path = getPath(groupId, dataId); + + init(serverAddr); + } + + public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser parser) { + super(parser); + if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) { + throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", + serverAddr, path)); + } + this.path = path; + this.groupId = null; + this.dataId = null; + + init(serverAddr); + } + + private void init(final String serverAddr) { + initZookeeperListener(serverAddr); + loadInitialConfig(); + } + + private void loadInitialConfig() { + try { + T newValue = loadConfig(); + if (newValue == null) { + RecordLog.info("[ZookeeperDataSource] WARN: initial config is null, you may have to check your data source"); + } + getProperty().updateValue(newValue); + } catch (Exception ex) { + RecordLog.info("[ZookeeperDataSource] Error when loading initial config", ex); + } + } + + private void initZookeeperListener(final String serverAddr) { + try { + + this.listener = new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + String configInfo = null; + ChildData childData = nodeCache.getCurrentData(); + if (null != childData && childData.getData() != null) { + + configInfo = new String(childData.getData()); + } + RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", + serverAddr, path, configInfo)); + T newValue = ZookeeperDataSource.this.parser.parse(configInfo); + // Update the new value to the property. + getProperty().updateValue(newValue); + } + }; + + this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); + this.zkClient.start(); + Stat stat = this.zkClient.checkExists().forPath(this.path); + if (stat == null) { + this.zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(this.path, null); + } + + this.nodeCache = new NodeCache(this.zkClient, this.path); + this.nodeCache.getListenable().addListener(this.listener, this.pool); + this.nodeCache.start(); + } catch (Exception e) { + RecordLog.info("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e); + e.printStackTrace(); + } + } + + @Override + public String readSource() throws Exception { + if (this.zkClient == null) { + throw new IllegalStateException("Zookeeper has not been initialized or error occurred"); + } + byte[] data = this.zkClient.getData().forPath(this.path); + if (data != null) { + return new String(data); + } + return null; + } + + @Override + public void close() throws Exception { + if (this.nodeCache != null) { + this.nodeCache.getListenable().removeListener(listener); + this.nodeCache.close(); + } + if (this.zkClient != null) { + this.zkClient.close(); + } + pool.shutdown(); + } + + private String getPath(String groupId, String dataId) { + String path = ""; + if (groupId.startsWith("/")) { + path += groupId; + } else { + path += "/" + groupId; + } + if (dataId.startsWith("/")) { + path += dataId; + } else { + path += "/" + dataId; + } + return path; + } +}