diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/AbstractDynamicConfiguration.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/AbstractDynamicConfiguration.java index 61fd3974d7e..bf1fa22df80 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/AbstractDynamicConfiguration.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/AbstractDynamicConfiguration.java @@ -301,6 +301,6 @@ protected static String getGroup(URL url) { * @since 2.7.8 */ protected static long getTimeout(URL url) { - return getParameter(url, TIMEOUT_PARAM_NAME, TimeUnit.SECONDS.toMillis(1L)); + return getParameter(url, TIMEOUT_PARAM_NAME, -1L); } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java index f4eb1f05f84..cbcb3a2f4e1 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java @@ -17,6 +17,7 @@ package org.apache.dubbo.common.config.configcenter; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.configcenter.file.FileSystemDynamicConfiguration; import org.apache.dubbo.common.utils.PathUtils; import org.apache.dubbo.common.utils.StringUtils; @@ -34,9 +35,9 @@ /** * An abstract implementation of {@link DynamicConfiguration} is like "tree-structure" path : * * * @see DynamicConfiguration @@ -81,7 +82,7 @@ protected final String doGetConfig(String key, String group) throws Exception { @Override public final boolean publishConfig(String key, String group, String content) { String pathKey = buildPathKey(group, key); - return execute(() -> doPublishConfig(pathKey, content), -1L); + return execute(() -> doPublishConfig(pathKey, content), getDefaultTimeout()); } @Override diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java index 646496b26d7..5bf8abed371 100644 --- a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java @@ -21,12 +21,10 @@ import org.apache.dubbo.common.config.configcenter.ConfigChangeType; import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent; import org.apache.dubbo.common.config.configcenter.ConfigurationListener; -import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; +import org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.CollectionUtils; -import org.apache.dubbo.common.utils.PathUtils; -import org.apache.dubbo.common.utils.StringUtils; import com.google.common.base.Charsets; import com.google.common.net.HostAndPort; @@ -35,39 +33,36 @@ import com.orbitz.consul.cache.KVCache; import com.orbitz.consul.model.kv.Value; +import java.util.Collection; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static org.apache.dubbo.common.config.configcenter.Constants.CONFIG_NAMESPACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR; -import static org.apache.dubbo.common.utils.StringUtils.EMPTY_STRING; /** * config center implementation for consul */ -public class ConsulDynamicConfiguration implements DynamicConfiguration { +public class ConsulDynamicConfiguration extends TreePathDynamicConfiguration { private static final Logger logger = LoggerFactory.getLogger(ConsulDynamicConfiguration.class); private static final int DEFAULT_PORT = 8500; private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000; private static final String WATCH_TIMEOUT = "consul-watch-timeout"; - private URL url; - private String rootPath; private Consul client; + private KeyValueClient kvClient; + private ConcurrentMap watchers = new ConcurrentHashMap<>(); public ConsulDynamicConfiguration(URL url) { - this.url = url; - this.rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + "config"; + super(url); String host = url.getHost(); int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; client = Consul.builder().withHostAndPort(HostAndPort.fromParts(host, port)).build(); @@ -75,81 +70,69 @@ public ConsulDynamicConfiguration(URL url) { } @Override - public void addListener(String key, String group, ConfigurationListener listener) { - logger.info("register listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); - String normalizedKey = convertKey(group, key); - ConsulListener watcher = watchers.computeIfAbsent(normalizedKey, k -> new ConsulListener(key, group)); - watcher.addListener(listener); + public String getInternalProperty(String key) { + logger.info("getting config from: " + key); + return kvClient.getValueAsString(key, Charsets.UTF_8).orElse(null); } @Override - public void removeListener(String key, String group, ConfigurationListener listener) { - logger.info("unregister listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); - ConsulListener watcher = watchers.get(convertKey(group, key)); - if (watcher != null) { - watcher.removeListener(listener); - } + protected boolean doPublishConfig(String pathKey, String content) throws Exception { + return kvClient.putValue(pathKey, content); } @Override - public String getConfig(String key, String group, long timeout) throws IllegalStateException { - return (String) getInternalProperty(convertKey(group, key)); + protected String doGetConfig(String pathKey) throws Exception { + return getInternalProperty(pathKey); } @Override - public SortedSet getConfigKeys(String group) throws UnsupportedOperationException { - SortedSet configKeys = new TreeSet<>(); - String normalizedKey = convertKey(group, EMPTY_STRING); - List keys = kvClient.getKeys(normalizedKey); + protected boolean doRemoveConfig(String pathKey) throws Exception { + kvClient.deleteKey(pathKey); + return true; + } + + @Override + protected Collection doGetConfigKeys(String groupPath) { + List keys = kvClient.getKeys(groupPath); + List configKeys = new LinkedList<>(); if (CollectionUtils.isNotEmpty(keys)) { keys.stream() - .filter(k -> !k.equals(normalizedKey)) + .filter(k -> !k.equals(groupPath)) .map(k -> k.substring(k.lastIndexOf(PATH_SEPARATOR) + 1)) .forEach(configKeys::add); } return configKeys; } - /** - * @param key the key to represent a configuration - * @param group the group where the key belongs to - * @param content the content of configuration - * @return - */ @Override - public boolean publishConfig(String key, String group, String content) { - String normalizedKey = convertKey(group, key); - return kvClient.putValue(normalizedKey, content); + protected void doAddListener(String pathKey, ConfigurationListener listener) { + logger.info("register listener " + listener.getClass() + " for config with key: " + pathKey); + ConsulListener watcher = watchers.computeIfAbsent(pathKey, k -> new ConsulListener(pathKey)); + watcher.addListener(listener); } @Override - public Object getInternalProperty(String key) { - logger.info("getting config from: " + key); - return kvClient.getValueAsString(key, Charsets.UTF_8).orElse(null); + protected void doRemoveListener(String pathKey, ConfigurationListener listener) { + logger.info("unregister listener " + listener.getClass() + " for config with key: " + pathKey); + ConsulListener watcher = watchers.get(pathKey); + if (watcher != null) { + watcher.removeListener(listener); + } } @Override - public void close() throws Exception { + protected void doClose() throws Exception { client.destroy(); } - private String convertKey(String group, String key) { - String actualGroup = StringUtils.isBlank(group) ? DEFAULT_GROUP : group; - return PathUtils.buildPath(rootPath, actualGroup, key); - } - private class ConsulListener implements KVCache.Listener { private KVCache kvCache; private Set listeners = new LinkedHashSet<>(); - private String key; - private String group; private String normalizedKey; - public ConsulListener(String key, String group) { - this.key = key; - this.group = group; - this.normalizedKey = convertKey(group, key); + public ConsulListener(String normalizedKey) { + this.normalizedKey = normalizedKey; initKVCache(); } @@ -171,7 +154,7 @@ public void notify(Map newValues) { // Values are encoded in key/value store, decode it if needed Optional decodedValue = newValue.get().getValueAsString(); decodedValue.ifPresent(v -> listeners.forEach(l -> { - ConfigChangedEvent event = new ConfigChangedEvent(key, group, v, ConfigChangeType.MODIFIED); + ConfigChangedEvent event = new ConfigChangedEvent(normalizedKey, getGroup(), v, ConfigChangeType.MODIFIED); l.process(event); })); }); diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java index 8ada5fbdc09..c54d1034143 100644 --- a/dubbo-configcenter/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java @@ -30,7 +30,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Optional; +import java.util.TreeSet; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -51,10 +53,10 @@ public static void setUp() throws Exception { consul = ConsulStarterBuilder.consulStarter() .build() .start(); - configCenterUrl = URL.valueOf("consul://localhost:" + consul.getHttpPort()); + configCenterUrl = URL.valueOf("consul://127.0.0.1:" + consul.getHttpPort()); configuration = new ConsulDynamicConfiguration(configCenterUrl); - client = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", consul.getHttpPort())).build(); + client = Consul.builder().withHostAndPort(HostAndPort.fromParts("127.0.0.1", consul.getHttpPort())).build(); kvClient = client.keyValueClient(); } @@ -74,6 +76,14 @@ public void testGetConfig() { Assertions.assertNull(configuration.getConfig("not-exist", "dubbo")); } + @Test + public void testPublishConfig() { + configuration.publishConfig("value", "metadata", "1"); + // test equals + assertEquals("1", configuration.getConfig("value", "/metadata")); + assertEquals("1", kvClient.getValueAsString("/dubbo/config/metadata/value").get()); + } + @Test public void testAddListener() { KVCache cache = KVCache.newCache(kvClient, "/dubbo/config/dubbo/foo"); @@ -104,6 +114,10 @@ public void testAddListener() { @Test public void testGetConfigKeys() { - + configuration.publishConfig("v1", "metadata", "1"); + configuration.publishConfig("v2", "metadata", "2"); + configuration.publishConfig("v3", "metadata", "3"); + // test equals + assertEquals(new TreeSet(Arrays.asList("v1", "v2", "v3")), configuration.getConfigKeys("metadata")); } }