Skip to content

Commit

Permalink
Merge pull request #32 from Coneboy-k/master
Browse files Browse the repository at this point in the history
FIX: zk server闪断导致服务信息丢失, 重新建立连接后无法再次发布该服务 #31
  • Loading branch information
fengjiachun authored Jul 22, 2017
2 parents 0349ce8 + 681deef commit ab91b51
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public abstract class AbstractRegistryService implements RegistryService {
private final LinkedBlockingQueue<RegisterMeta> queue = new LinkedBlockingQueue<>();
private final ExecutorService executor =
Executors.newSingleThreadExecutor(new NamedThreadFactory("registry.executor"));
private final ExecutorService localRegisterWatchExecutor =
Executors.newSingleThreadExecutor(new NamedThreadFactory("registry.RegNodeWatchExecutor"));

private final AtomicBoolean shutdown = new AtomicBoolean(false);

private final ConcurrentMap<RegisterMeta.ServiceMeta, RegisterValue> registries =
Expand Down Expand Up @@ -85,6 +88,22 @@ public void run() {
}
}
});

localRegisterWatchExecutor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
try {
Thread.sleep(3000);
doCheckRegisterNodeStatus();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Register check register NodeStatus fail: {}, will try again...", stackTrace(t));
}
}
}
}
});
}

@Override
Expand Down Expand Up @@ -144,7 +163,8 @@ public void shutdownGracefully() {
executor.shutdown();
try {
destroy();
} catch (Exception ignored) {}
} catch (Exception ignored) {
}
}
}

Expand Down Expand Up @@ -235,6 +255,8 @@ protected void notify(

protected abstract void doUnregister(RegisterMeta meta);

protected abstract void doCheckRegisterNodeStatus();

private static class RegisterValue {
private long version = Long.MIN_VALUE;
private final Set<RegisterMeta> metaSet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,35 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
}

@Override
protected void doCheckRegisterNodeStatus() {

for (RegisterMeta meta : registerMetaSet()) {

String directory = String.format("/jupiter/provider/%s/%s/%s",
meta.getGroup(),
meta.getServiceProviderName(),
meta.getVersion());

String nodePath = String.format("%s/%s:%s:%s:%s",
directory,
meta.getHost(),
String.valueOf(meta.getPort()),
String.valueOf(meta.getWeight()),
String.valueOf(meta.getConnCount()));

try {
if (configClient.checkExists().forPath(nodePath) == null) {
super.register(meta);
}
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("CheckRegisterStatus register meta: {} path failed, {}.", meta, stackTrace(e));
}
}
}
}

@Override
public void connectToRegistryServer(String connectString) {
checkNotNull(connectString, "connectString");
Expand Down

0 comments on commit ab91b51

Please sign in to comment.