Skip to content

Commit

Permalink
refactor registry for support setting service available
Browse files Browse the repository at this point in the history
  • Loading branch information
qdaxb committed Apr 25, 2016
1 parent cfe21bb commit 946ee4d
Show file tree
Hide file tree
Showing 26 changed files with 344 additions and 182 deletions.
13 changes: 9 additions & 4 deletions docs/wiki/en_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,10 @@ UI backend [http://localhost:8500/ui](http://localhost:8500/ui)
<motan:referer id="remoteService" interface="quickstart.FooService" registry="my_consul"/>
```


4. After the server starts, you need to call the hearbeat switch explicitly in order to register in Consul.
4. After the server starts, you SHOULD call hearbeat switcher explicitly in order to start heartbeat for Consul.

```java
MotanSwitcherUtil.setSwitcher(ConsulConstants.NAMING_PROCESS_HEARTBEAT_SWITCHER, true)
MotanSwitcherUtil.setSwitcher(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true)
```

5. Go to [UI backend](http://localhost:8500/ui). Verify whether the service is normal.
Expand Down Expand Up @@ -263,7 +262,13 @@ Install and start ZooKeeper:
<motan:referer id="remoteService" interface="quickstart.FooService" registry="my_zookeeper"/>
```

4. Start client, call service.
4. After the server starts, you SHOULD call hearbeat switcher explicitly in order to start heartbeat for Zookeeper.

```java
MotanSwitcherUtil.setSwitcher(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true)
```

5. Start client, call service.


[maven]:https://maven.apache.org
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public class MotanConstants {

public static final String ZOOKEEPER_REGISTRY_NAMESPACE = "/motan";
public static final String ZOOKEEPER_REGISTRY_COMMAND = "/command";

public static final String REGISTRY_HEARTBEAT_SWITCHER = "feature.configserver.heartbeat";

/**
* 默认的consistent的hash的数量
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class SwitcherFilter implements Filter {
@Override
public Response filter(Caller<?> caller, Request request) {
// 检查接口或方法降级开关状态
if (MotanSwitcherUtil.switcherIsOpen(request.getInterfaceName())
|| MotanSwitcherUtil.switcherIsOpen(MotanFrameworkUtil.getFullMethodString(request))) {
if (MotanSwitcherUtil.isOpen(request.getInterfaceName())
|| MotanSwitcherUtil.isOpen(MotanFrameworkUtil.getFullMethodString(request))) {
// 返回的reponse需要设置exception,这样invocationhandler会在throwException为false时,构建默认值返回
return mockDefaultResponse(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public byte[] encode(Channel channel, Object message) throws IOException {
// v1降级开关打开、心跳请求、client端使用v1版本时,需要使用v1编码
private boolean needEncodeV1(Object message) {

if (MotanSwitcherUtil.switcherIsOpen(CODEC_VERSION_SWITCHER)) {
if (MotanSwitcherUtil.isOpen(CODEC_VERSION_SWITCHER)) {
return true;
}
if (message instanceof Request) {
Expand Down Expand Up @@ -142,7 +142,7 @@ private boolean needEncodeV1(Object message) {
*/
@Override
public Object decode(Channel channel, String remoteIp, byte[] data) throws IOException {
if (MotanSwitcherUtil.switcherIsOpen(CODEC_VERSION_SWITCHER)) {
if (MotanSwitcherUtil.isOpen(CODEC_VERSION_SWITCHER)) {
// 降级开关打开时,使用v1版本codec
return v1Codec.decode(channel, remoteIp, data);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.weibo.api.motan.rpc.URL;

import java.util.Collection;


/**
*
Expand All @@ -29,7 +31,33 @@

public interface RegistryService {

/**
* register service to registry
*
* @param url
*/
void register(URL url);

/**
* unregister service to registry
*
* @param url
*/
void unregister(URL url);

/**
* set service status to available, so clients could use it
*
* @param url service url to be available, <b>null</b> means all services
*/
void available(URL url);

/**
* set service status to unavailable, client should not discover services of unavailable state
*
* @param url service url to be unavailable, <b>null</b> means all services
*/
void unavailable(URL url);

Collection<URL> getRegisteredServiceUrls();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

package com.weibo.api.motan.registry.support;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.registry.NotifyListener;
import com.weibo.api.motan.registry.Registry;
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.switcher.SwitcherListener;
import com.weibo.api.motan.util.ConcurrentHashSet;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanSwitcherUtil;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* <pre>
Expand All @@ -47,10 +47,26 @@ public abstract class AbstractRegistry implements Registry {
new ConcurrentHashMap<URL, Map<String, List<URL>>>();

private URL registryUrl;
private Set<URL> registeredServiceUrls = new ConcurrentHashSet<URL>();
protected String registryClassName = this.getClass().getSimpleName();

public AbstractRegistry(URL url) {
this.registryUrl = url.createCopy();
// register a heartbeat switcher to perceive service state change and change available state
MotanSwitcherUtil.registerSwitcherListener(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, new SwitcherListener() {

@Override
public void onValueChanged(String key, Boolean value) {
if (key != null && value != null) {
if (value) {
available(null);
} else {
unavailable(null);
}
}

}
});
}

@Override
Expand All @@ -61,6 +77,7 @@ public void register(URL url) {
}
LoggerUtil.info("[{}] Url ({}) will register to Registry [{}]", registryClassName, url, registryUrl.getIdentity());
doRegister(removeUnnecessaryParmas(url.createCopy()));
registeredServiceUrls.add(url);
}

@Override
Expand All @@ -71,6 +88,7 @@ public void unregister(URL url) {
}
LoggerUtil.info("[{}] Url ({}) will unregister to Registry [{}]", registryClassName, url, registryUrl.getIdentity());
doUnregister(removeUnnecessaryParmas(url.createCopy()));
registeredServiceUrls.remove(url);
}

@Override
Expand Down Expand Up @@ -128,6 +146,34 @@ public URL getUrl() {
return registryUrl;
}

@Override
public Collection<URL> getRegisteredServiceUrls() {
return registeredServiceUrls;
}

@Override
public void available(URL url) {
LoggerUtil.info("[{}] Url ({}) will set to available to Registry [{}]", registryClassName, url, registryUrl.getIdentity());
if(url != null) {
doAvailable(removeUnnecessaryParmas(url.createCopy()));
} else {
doAvailable(null);
}
}



@Override
public void unavailable(URL url) {
LoggerUtil.info("[{}] Url ({}) will set to unavailable to Registry [{}]", registryClassName, url, registryUrl.getIdentity());
if(url == null) {
doUnavailable(removeUnnecessaryParmas(url.createCopy()));
} else {
doUnregister(null);
}

}

protected List<URL> getCachedUrls(URL url) {
Map<String, List<URL>> rsUrls = subscribedCategoryResponses.get(url);
if (rsUrls == null || rsUrls.size() == 0) {
Expand Down Expand Up @@ -194,4 +240,9 @@ private URL removeUnnecessaryParmas(URL url) {

protected abstract List<URL> doDiscover(URL url);

protected abstract void doAvailable(URL url);

protected abstract void doUnavailable(URL url);


}
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public void run() {
}

@Override
public void doRegister(URL url) {
public void register(URL url) {
failedRegistered.remove(url);
failedUnregistered.remove(url);

try {
concreteRegister(url);
super.register(url);
} catch (Exception e) {
if (isCheckingUrls(getUrl(), url)) {
throw new MotanFrameworkException(String.format("[%s] false to registery %s to %s", registryClassName, url, getUrl()), e);
Expand All @@ -85,12 +85,12 @@ public void doRegister(URL url) {
}

@Override
public void doUnregister(URL url) {
public void unregister(URL url) {
failedRegistered.remove(url);
failedUnregistered.remove(url);

try {
concreteUnregister(url);
super.unregister(url);
} catch (Exception e) {
if (isCheckingUrls(getUrl(), url)) {
throw new MotanFrameworkException(String.format("[%s] false to unregistery %s to %s", registryClassName, url, getUrl()), e);
Expand All @@ -100,11 +100,11 @@ public void doUnregister(URL url) {
}

@Override
public void doSubscribe(URL url, NotifyListener listener) {
public void subscribe(URL url, NotifyListener listener) {
removeForFailedSubAndUnsub(url, listener);

try {
concreteSubscribe(url, listener);
super.subscribe(url, listener);
} catch (Exception e) {
List<URL> cachedUrls = getCachedUrls(url);
if (cachedUrls != null && cachedUrls.size() > 0) {
Expand All @@ -118,11 +118,11 @@ public void doSubscribe(URL url, NotifyListener listener) {
}

@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
public void unsubscribe(URL url, NotifyListener listener) {
removeForFailedSubAndUnsub(url, listener);

try {
concreteUnsubscribe(url, listener);
super.unsubscribe(url, listener);
} catch (Exception e) {
if (isCheckingUrls(getUrl(), url)) {
throw new MotanFrameworkException(String.format("[%s] false to unsubscribe %s from %s", registryClassName, url, getUrl()),
Expand All @@ -134,9 +134,9 @@ public void doUnsubscribe(URL url, NotifyListener listener) {

@Override
@SuppressWarnings("unchecked")
protected List<URL> doDiscover(URL url) {
public List<URL> discover(URL url) {
try {
return concreteDiscover(url);
return super.discover(url);
} catch (Exception e) {
// 如果discover失败,返回一个empty list吧,毕竟是个下行动作,
LoggerUtil.warn(String.format("Failed to discover url:%s in registry (%s)", url, getUrl()), e);
Expand Down Expand Up @@ -179,7 +179,7 @@ private void retry() {
LoggerUtil.info("[{}] Retry register {}", registryClassName, failed);
try {
for (URL url : failed) {
concreteRegister(url);
super.register(url);
failedRegistered.remove(url);
}
} catch (Exception e) {
Expand All @@ -193,7 +193,7 @@ private void retry() {
LoggerUtil.info("[{}] Retry unregister {}", registryClassName, failed);
try {
for (URL url : failed) {
concreteUnregister(url);
super.unregister(url);
failedUnregistered.remove(url);
}
} catch (Exception e) {
Expand All @@ -216,7 +216,7 @@ private void retry() {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
concreteSubscribe(url, listener);
super.subscribe(url, listener);
listeners.remove(listener);
}
}
Expand All @@ -240,7 +240,7 @@ private void retry() {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
concreteUnsubscribe(url, listener);
super.unsubscribe(url, listener);
listeners.remove(listener);
}
}
Expand All @@ -253,13 +253,4 @@ private void retry() {

}

protected abstract void concreteRegister(URL url);

protected abstract void concreteUnregister(URL url);

protected abstract void concreteSubscribe(URL url, NotifyListener listener);

protected abstract void concreteUnsubscribe(URL url, NotifyListener listener);

protected abstract List<URL> concreteDiscover(URL url);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ public List<URL> doDiscover(URL url) {
return registeredServices.get(getRegistryKey(url));
}

@Override
protected void doAvailable(URL url) {
//do nothing
}

@Override
protected void doUnavailable(URL url) {
//do nothing
}

@Override
public void doRegister(URL url) {
String registryKey = getRegistryKey(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public CommandFailbackRegistry(URL url) {
}

@Override
protected void concreteSubscribe(URL url, final NotifyListener listener) {
protected void doSubscribe(URL url, final NotifyListener listener) {
LoggerUtil.info("CommandFailbackRegistry subscribe. url: " + url.toSimpleString());
URL urlCopy = url.createCopy();
CommandServiceManager manager = getCommandServiceManager(urlCopy);
Expand All @@ -47,14 +47,14 @@ protected void concreteSubscribe(URL url, final NotifyListener listener) {
subscribeService(urlCopy, manager);
subscribeCommand(urlCopy, manager);

List<URL> urls = concreteDiscover(urlCopy);
List<URL> urls = doDiscover(urlCopy);
if (urls != null && urls.size() > 0) {
this.notify(urlCopy, listener, urls);
}
}

@Override
protected void concreteUnsubscribe(URL url, NotifyListener listener) {
protected void doUnsubscribe(URL url, NotifyListener listener) {
LoggerUtil.info("CommandFailbackRegistry unsubscribe. url: " + url.toSimpleString());
URL urlCopy = url.createCopy();
CommandServiceManager manager = commandManagerMap.get(urlCopy);
Expand All @@ -66,7 +66,7 @@ protected void concreteUnsubscribe(URL url, NotifyListener listener) {
}

@Override
protected List<URL> concreteDiscover(URL url) {
protected List<URL> doDiscover(URL url) {
LoggerUtil.info("CommandFailbackRegistry discover. url: " + url.toSimpleString());
List<URL> finalResult;

Expand Down
Loading

0 comments on commit 946ee4d

Please sign in to comment.