Skip to content

Commit

Permalink
Polish apache#6261 : Migration the latest Nacos registry implementati…
Browse files Browse the repository at this point in the history
…on to upstream
  • Loading branch information
mercyblitz committed Jun 2, 2020
1 parent a182bab commit 9a01103
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.registry.nacos;


import com.alibaba.nacos.api.common.Constants;
import com.google.common.collect.Lists;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.logger.Logger;
Expand All @@ -36,6 +34,7 @@
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -139,8 +138,7 @@ public List<URL> lookup(final URL url) {
execute(namingService -> {
Set<String> serviceNames = getServiceNames(url, null);
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
List<Instance> instances = namingService.getAllInstances(serviceName);
urls.addAll(buildURLs(url, instances));
}
});
Expand All @@ -151,25 +149,15 @@ public List<URL> lookup(final URL url) {
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
/**
* namingService.registerInstance with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
* default {@link DEFAULT_GROUP}
*
* in https://github.com/apache/dubbo/issues/5978
*/
execute(namingService -> namingService.registerInstance(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
execute(namingService -> namingService.registerInstance(serviceName, instance));
}

@Override
public void doUnregister(final URL url) {
execute(namingService -> {
String serviceName = getServiceName(url);
Instance instance = createInstance(url);
namingService.deregisterInstance(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
instance.getIp()
, instance.getPort());
namingService.deregisterInstance(serviceName, instance.getIp(), instance.getPort());
});
}

Expand All @@ -178,8 +166,8 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
Set<String> serviceNames = getServiceNames(url, listener);

//Set corresponding serviceNames for easy search later
if (isServiceNamesWithCompatibleMode(url)) {
for (String serviceName : serviceNames) {
if(isServiceNamesWithCompatibleMode(url)){
for(String serviceName:serviceNames){
NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames);
}
}
Expand All @@ -195,15 +183,9 @@ private void doSubscribe(final URL url, final NotifyListener listener, final Set
/**
* Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
* in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
*
* namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
* default {@link DEFAULT_GROUP}
*
* in https://github.com/apache/dubbo/issues/5978
*/
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
List<Instance> instances = namingService.getAllInstances(serviceName);
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
allCorrespondingInstanceList.addAll(instances);
}
Expand All @@ -212,10 +194,9 @@ private void doSubscribe(final URL url, final NotifyListener listener, final Set
subscribeEventListener(serviceName, url, listener);
}
} else {
List<Instance> instances = new LinkedList<>();
List<Instance> instances = new LinkedList();
for (String serviceName : serviceNames) {
instances.addAll(namingService.getAllInstances(serviceName
, getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)));
instances.addAll(namingService.getAllInstances(serviceName));
notifySubscriber(url, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
Expand Down Expand Up @@ -294,8 +275,7 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) {

execute(namingService -> {

serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData()
.stream()
.map(NacosServiceName::new)
.filter(serviceName::isCompatible)
Expand Down Expand Up @@ -374,8 +354,7 @@ private Set<String> getAllServiceNames() {
execute(namingService -> {

int pageIndex = 1;
ListView<String> listView = namingService.getServicesOfServer(pageIndex, PAGINATION_SIZE,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
ListView<String> listView = namingService.getServicesOfServer(pageIndex, PAGINATION_SIZE);
// First page data
List<String> firstPageData = listView.getData();
// Append first page into list
Expand All @@ -391,8 +370,7 @@ private Set<String> getAllServiceNames() {
}
// If more than 1 page
while (pageIndex < pageNumbers) {
listView = namingService.getServicesOfServer(++pageIndex, PAGINATION_SIZE,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
listView = namingService.getServicesOfServer(++pageIndex, PAGINATION_SIZE);
serviceNames.addAll(listView.getData());
}

Expand Down Expand Up @@ -495,7 +473,7 @@ private void subscribeEventListener(String serviceName, final URL url, final Not
List<Instance> instances = e.getInstances();


if (isServiceNamesWithCompatibleMode(url)) {
if(isServiceNamesWithCompatibleMode(url)){
/**
* Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
* in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
Expand All @@ -507,9 +485,7 @@ private void subscribeEventListener(String serviceName, final URL url, final Not
notifySubscriber(url, listener, instances);
}
};
namingService.subscribe(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
eventListener);
namingService.subscribe(serviceName, eventListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
Expand Down Expand Up @@ -49,8 +47,6 @@
*/
public class NacosServiceDiscovery implements ServiceDiscovery {

private final Logger logger = LoggerFactory.getLogger(getClass());

private String group;

private NamingService namingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,30 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.utils.NamingUtils;

import java.lang.reflect.Field;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Stream;

import static com.alibaba.nacos.api.PropertyKeyConst.ACCESS_KEY;
import static com.alibaba.nacos.api.PropertyKeyConst.CLUSTER_NAME;
import static com.alibaba.nacos.api.PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT;
import static com.alibaba.nacos.api.PropertyKeyConst.CONFIG_RETRY_TIME;
import static com.alibaba.nacos.api.PropertyKeyConst.CONTEXT_PATH;
import static com.alibaba.nacos.api.PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG;
import static com.alibaba.nacos.api.PropertyKeyConst.ENCODE;
import static com.alibaba.nacos.api.PropertyKeyConst.ENDPOINT;
import static com.alibaba.nacos.api.PropertyKeyConst.ENDPOINT_PORT;
import static com.alibaba.nacos.api.PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING;
import static com.alibaba.nacos.api.PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE;
import static com.alibaba.nacos.api.PropertyKeyConst.MAX_RETRY;
import static com.alibaba.nacos.api.PropertyKeyConst.NAMESPACE;
import static com.alibaba.nacos.api.PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT;
import static com.alibaba.nacos.api.PropertyKeyConst.NAMING_LOAD_CACHE_AT_START;
import static com.alibaba.nacos.api.PropertyKeyConst.NAMING_POLLING_THREAD_COUNT;
import static com.alibaba.nacos.api.PropertyKeyConst.RAM_ROLE_NAME;
import static com.alibaba.nacos.api.PropertyKeyConst.SECRET_KEY;
import static com.alibaba.nacos.api.PropertyKeyConst.SERVER_ADDR;
import static com.alibaba.nacos.api.common.Constants.DEFAULT_GROUP;
import static com.alibaba.nacos.client.naming.utils.UtilAndComs.NACOS_NAMING_LOG_NAME;
import static java.lang.reflect.Modifier.isFinal;
import static java.lang.reflect.Modifier.isPublic;
import static java.lang.reflect.Modifier.isStatic;
import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
import static org.apache.dubbo.common.utils.StringUtils.isEmpty;
import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;

/**
* The utilities class for {@link NamingService}
Expand All @@ -63,6 +53,33 @@ public class NacosNamingServiceUtils {

private static final Logger logger = LoggerFactory.getLogger(NacosNamingServiceUtils.class);

private static final String[] NACOS_PROPERTY_NAMES;

static {
NACOS_PROPERTY_NAMES = initNacosPropertyNames();
}

private static String[] initNacosPropertyNames() {
return Stream.of(PropertyKeyConst.class.getFields())
.filter(f -> isStatic(f.getModifiers())) // static
.filter(f -> isPublic(f.getModifiers())) // public
.filter(f -> isFinal(f.getModifiers())) // final
.filter(f -> String.class.equals(f.getType())) // String type
.map(NacosNamingServiceUtils::getConstantValue)
.filter(Objects::nonNull)
.map(String::valueOf)
.toArray(String[]::new);
}

private static Object getConstantValue(Field field) {
Object value = null;
try {
value = field.get(null);
} catch (IllegalAccessException e) {
}
return value;
}

/**
* Convert the {@link ServiceInstance} to {@link Instance}
*
Expand Down Expand Up @@ -91,7 +108,7 @@ public static Instance toInstance(ServiceInstance serviceInstance) {
*/
public static ServiceInstance toServiceInstance(Instance instance) {
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getInstanceId(),
NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort());
instance.getServiceName(), instance.getIp(), instance.getPort());
serviceInstance.setMetadata(instance.getMetadata());
serviceInstance.setEnabled(instance.isEnabled());
serviceInstance.setHealthy(instance.isHealthy());
Expand Down Expand Up @@ -154,40 +171,38 @@ private static void setServerAddr(URL url, Properties properties) {
}

private static void setProperties(URL url, Properties properties) {

putPropertyIfAbsent(url, properties, NACOS_NAMING_LOG_NAME);
putPropertyIfAbsent(url, properties, IS_USE_CLOUD_NAMESPACE_PARSING);
putPropertyIfAbsent(url, properties, IS_USE_ENDPOINT_PARSING_RULE);
putPropertyIfAbsent(url, properties, ENDPOINT);
putPropertyIfAbsent(url, properties, ENDPOINT_PORT);
putPropertyIfAbsent(url, properties, NAMESPACE);
putPropertyIfAbsent(url, properties, ACCESS_KEY);
putPropertyIfAbsent(url, properties, SECRET_KEY);
putPropertyIfAbsent(url, properties, RAM_ROLE_NAME);
putPropertyIfAbsent(url, properties, CONTEXT_PATH);
putPropertyIfAbsent(url, properties, CLUSTER_NAME);
putPropertyIfAbsent(url, properties, ENCODE);
putPropertyIfAbsent(url, properties, CONFIG_LONG_POLL_TIMEOUT);
putPropertyIfAbsent(url, properties, CONFIG_RETRY_TIME);
putPropertyIfAbsent(url, properties, MAX_RETRY);
putPropertyIfAbsent(url, properties, ENABLE_REMOTE_SYNC_CONFIG);
putPropertyIfAbsent(url, properties, NAMING_LOAD_CACHE_AT_START, "true");
putPropertyIfAbsent(url, properties, NAMING_CLIENT_BEAT_THREAD_COUNT);
putPropertyIfAbsent(url, properties, NAMING_POLLING_THREAD_COUNT);

for (String propertyName : NACOS_PROPERTY_NAMES) {
putPropertyIfAbsent(url, properties, propertyName);
}
}

private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName) {
String propertyValue = url.getParameter(propertyName);
if (StringUtils.isNotEmpty(propertyValue)) {
properties.setProperty(propertyName, propertyValue);
}
putPropertyIfAbsent(url, properties, propertyName, null);
}

private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName, String defaultValue) {
String propertyValue = url.getParameter(propertyName);
if (StringUtils.isNotEmpty(propertyValue)) {
properties.setProperty(propertyName, propertyValue);
} else {
properties.setProperty(propertyName, defaultValue);
putPropertyIfAbsent(properties, propertyName, propertyValue, defaultValue);
}

private static void putPropertyIfAbsent(Properties properties, String propertyName, String propertyValue) {
putPropertyIfAbsent(properties, propertyName, propertyValue, null);
}

private static void putPropertyIfAbsent(Properties properties, String propertyName, String propertyValue,
String defaultValue) {
if (isEmpty(propertyName) && properties.containsKey(propertyName)) {
return;
}

String value = isEmpty(propertyValue) ? defaultValue : propertyValue;

if (isNotEmpty(value)) {
properties.setProperty(propertyName, value);
}
}
}
}

0 comments on commit 9a01103

Please sign in to comment.