Skip to content

Commit

Permalink
Polish apache#6315 : Refactoring ConsulDynamicConfiguration by TreePa…
Browse files Browse the repository at this point in the history
…thDynamicConfiguration
  • Loading branch information
mercyblitz committed Jun 12, 2020
1 parent 454cea6 commit 2bdb538
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,6 @@ protected static String getGroup(URL url) {
* @since 2.7.8
*/
protected static long getTimeout(URL url) {
return getParameter(url, TIMEOUT_PARAM_NAME, TimeUnit.SECONDS.toMillis(1L));
return getParameter(url, TIMEOUT_PARAM_NAME, -1L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.common.config.configcenter;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.file.FileSystemDynamicConfiguration;
import org.apache.dubbo.common.utils.PathUtils;
import org.apache.dubbo.common.utils.StringUtils;

Expand All @@ -34,9 +35,9 @@
/**
* An abstract implementation of {@link DynamicConfiguration} is like "tree-structure" path :
* <ul>
* <li>"zookeeper"</li>
* <li>"consul"</li>
* <li>"etcd"</li>
* <li>{@link FileSystemDynamicConfiguration "file"}</li>
* <li>{@link org.apache.dubbo.configcenter.support.zookeeper.ZookeeperDynamicConfiguration "zookeeper"}</li>
* <li>{@link org.apache.dubbo.configcenter.consul.ConsulDynamicConfiguration "consul"}</li>
* </ul>
*
* @see DynamicConfiguration
Expand Down Expand Up @@ -81,7 +82,7 @@ protected final String doGetConfig(String key, String group) throws Exception {
@Override
public final boolean publishConfig(String key, String group, String content) {
String pathKey = buildPathKey(group, key);
return execute(() -> doPublishConfig(pathKey, content), -1L);
return execute(() -> doPublishConfig(pathKey, content), getDefaultTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.PathUtils;
import org.apache.dubbo.common.utils.StringUtils;

import com.google.common.base.Charsets;
import com.google.common.net.HostAndPort;
Expand All @@ -35,121 +33,106 @@
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.dubbo.common.config.configcenter.Constants.CONFIG_NAMESPACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
import static org.apache.dubbo.common.utils.StringUtils.EMPTY_STRING;

/**
* config center implementation for consul
*/
public class ConsulDynamicConfiguration implements DynamicConfiguration {
public class ConsulDynamicConfiguration extends TreePathDynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ConsulDynamicConfiguration.class);

private static final int DEFAULT_PORT = 8500;
private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
private static final String WATCH_TIMEOUT = "consul-watch-timeout";

private URL url;
private String rootPath;
private Consul client;

private KeyValueClient kvClient;

private ConcurrentMap<String, ConsulListener> watchers = new ConcurrentHashMap<>();

public ConsulDynamicConfiguration(URL url) {
this.url = url;
this.rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + "config";
super(url);
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
client = Consul.builder().withHostAndPort(HostAndPort.fromParts(host, port)).build();
this.kvClient = client.keyValueClient();
}

@Override
public void addListener(String key, String group, ConfigurationListener listener) {
logger.info("register listener " + listener.getClass() + " for config with key: " + key + ", group: " + group);
String normalizedKey = convertKey(group, key);
ConsulListener watcher = watchers.computeIfAbsent(normalizedKey, k -> new ConsulListener(key, group));
watcher.addListener(listener);
public String getInternalProperty(String key) {
logger.info("getting config from: " + key);
return kvClient.getValueAsString(key, Charsets.UTF_8).orElse(null);
}

@Override
public void removeListener(String key, String group, ConfigurationListener listener) {
logger.info("unregister listener " + listener.getClass() + " for config with key: " + key + ", group: " + group);
ConsulListener watcher = watchers.get(convertKey(group, key));
if (watcher != null) {
watcher.removeListener(listener);
}
protected boolean doPublishConfig(String pathKey, String content) throws Exception {
return kvClient.putValue(pathKey, content);
}

@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
return (String) getInternalProperty(convertKey(group, key));
protected String doGetConfig(String pathKey) throws Exception {
return getInternalProperty(pathKey);
}

@Override
public SortedSet<String> getConfigKeys(String group) throws UnsupportedOperationException {
SortedSet<String> configKeys = new TreeSet<>();
String normalizedKey = convertKey(group, EMPTY_STRING);
List<String> keys = kvClient.getKeys(normalizedKey);
protected boolean doRemoveConfig(String pathKey) throws Exception {
kvClient.deleteKey(pathKey);
return true;
}

@Override
protected Collection<String> doGetConfigKeys(String groupPath) {
List<String> keys = kvClient.getKeys(groupPath);
List<String> configKeys = new LinkedList<>();
if (CollectionUtils.isNotEmpty(keys)) {
keys.stream()
.filter(k -> !k.equals(normalizedKey))
.filter(k -> !k.equals(groupPath))
.map(k -> k.substring(k.lastIndexOf(PATH_SEPARATOR) + 1))
.forEach(configKeys::add);
}
return configKeys;
}

/**
* @param key the key to represent a configuration
* @param group the group where the key belongs to
* @param content the content of configuration
* @return
*/
@Override
public boolean publishConfig(String key, String group, String content) {
String normalizedKey = convertKey(group, key);
return kvClient.putValue(normalizedKey, content);
protected void doAddListener(String pathKey, ConfigurationListener listener) {
logger.info("register listener " + listener.getClass() + " for config with key: " + pathKey);
ConsulListener watcher = watchers.computeIfAbsent(pathKey, k -> new ConsulListener(pathKey));
watcher.addListener(listener);
}

@Override
public Object getInternalProperty(String key) {
logger.info("getting config from: " + key);
return kvClient.getValueAsString(key, Charsets.UTF_8).orElse(null);
protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
logger.info("unregister listener " + listener.getClass() + " for config with key: " + pathKey);
ConsulListener watcher = watchers.get(pathKey);
if (watcher != null) {
watcher.removeListener(listener);
}
}

@Override
public void close() throws Exception {
protected void doClose() throws Exception {
client.destroy();
}

private String convertKey(String group, String key) {
String actualGroup = StringUtils.isBlank(group) ? DEFAULT_GROUP : group;
return PathUtils.buildPath(rootPath, actualGroup, key);
}

private class ConsulListener implements KVCache.Listener<String, Value> {

private KVCache kvCache;
private Set<ConfigurationListener> listeners = new LinkedHashSet<>();
private String key;
private String group;
private String normalizedKey;

public ConsulListener(String key, String group) {
this.key = key;
this.group = group;
this.normalizedKey = convertKey(group, key);
public ConsulListener(String normalizedKey) {
this.normalizedKey = normalizedKey;
initKVCache();
}

Expand All @@ -171,7 +154,7 @@ public void notify(Map<String, Value> newValues) {
// Values are encoded in key/value store, decode it if needed
Optional<String> decodedValue = newValue.get().getValueAsString();
decodedValue.ifPresent(v -> listeners.forEach(l -> {
ConfigChangedEvent event = new ConfigChangedEvent(key, group, v, ConfigChangeType.MODIFIED);
ConfigChangedEvent event = new ConfigChangedEvent(normalizedKey, getGroup(), v, ConfigChangeType.MODIFIED);
l.process(event);
}));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Optional;
import java.util.TreeSet;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -51,10 +53,10 @@ public static void setUp() throws Exception {
consul = ConsulStarterBuilder.consulStarter()
.build()
.start();
configCenterUrl = URL.valueOf("consul://localhost:" + consul.getHttpPort());
configCenterUrl = URL.valueOf("consul://127.0.0.1:" + consul.getHttpPort());

configuration = new ConsulDynamicConfiguration(configCenterUrl);
client = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", consul.getHttpPort())).build();
client = Consul.builder().withHostAndPort(HostAndPort.fromParts("127.0.0.1", consul.getHttpPort())).build();
kvClient = client.keyValueClient();
}

Expand All @@ -74,6 +76,14 @@ public void testGetConfig() {
Assertions.assertNull(configuration.getConfig("not-exist", "dubbo"));
}

@Test
public void testPublishConfig() {
configuration.publishConfig("value", "metadata", "1");
// test equals
assertEquals("1", configuration.getConfig("value", "/metadata"));
assertEquals("1", kvClient.getValueAsString("/dubbo/config/metadata/value").get());
}

@Test
public void testAddListener() {
KVCache cache = KVCache.newCache(kvClient, "/dubbo/config/dubbo/foo");
Expand Down Expand Up @@ -104,6 +114,10 @@ public void testAddListener() {

@Test
public void testGetConfigKeys() {

configuration.publishConfig("v1", "metadata", "1");
configuration.publishConfig("v2", "metadata", "2");
configuration.publishConfig("v3", "metadata", "3");
// test equals
assertEquals(new TreeSet(Arrays.asList("v1", "v2", "v3")), configuration.getConfigKeys("metadata"));
}
}

0 comments on commit 2bdb538

Please sign in to comment.