diff --git a/jupiter-registry/jupiter-registry-api/src/main/java/org/jupiter/registry/AbstractRegistryService.java b/jupiter-registry/jupiter-registry-api/src/main/java/org/jupiter/registry/AbstractRegistryService.java index 06cc89e0..c39fe969 100644 --- a/jupiter-registry/jupiter-registry-api/src/main/java/org/jupiter/registry/AbstractRegistryService.java +++ b/jupiter-registry/jupiter-registry-api/src/main/java/org/jupiter/registry/AbstractRegistryService.java @@ -47,6 +47,9 @@ public abstract class AbstractRegistryService implements RegistryService { private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); private final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("registry.executor")); + private final ExecutorService localRegisterWatchExecutor = + Executors.newSingleThreadExecutor(new NamedThreadFactory("registry.RegNodeWatchExecutor")); + private final AtomicBoolean shutdown = new AtomicBoolean(false); private final ConcurrentMap registries = @@ -85,6 +88,22 @@ public void run() { } } }); + + localRegisterWatchExecutor.execute(new Runnable() { + @Override + public void run() { + while (!shutdown.get()) { + try { + Thread.sleep(3000); + doCheckRegisterNodeStatus(); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Register check register NodeStatus fail: {}, will try again...", stackTrace(t)); + } + } + } + } + }); } @Override @@ -144,7 +163,8 @@ public void shutdownGracefully() { executor.shutdown(); try { destroy(); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } } } @@ -235,6 +255,8 @@ protected void notify( protected abstract void doUnregister(RegisterMeta meta); + protected abstract void doCheckRegisterNodeStatus(); + private static class RegisterValue { private long version = Long.MIN_VALUE; private final Set metaSet = new HashSet<>(); diff --git a/jupiter-registry/jupiter-registry-zookeeper/src/main/java/org/jupiter/registry/zookeeper/ZookeeperRegistryService.java b/jupiter-registry/jupiter-registry-zookeeper/src/main/java/org/jupiter/registry/zookeeper/ZookeeperRegistryService.java index b23939c7..aa5ff860 100644 --- a/jupiter-registry/jupiter-registry-zookeeper/src/main/java/org/jupiter/registry/zookeeper/ZookeeperRegistryService.java +++ b/jupiter-registry/jupiter-registry-zookeeper/src/main/java/org/jupiter/registry/zookeeper/ZookeeperRegistryService.java @@ -254,6 +254,35 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex } } + @Override + protected void doCheckRegisterNodeStatus() { + + for (RegisterMeta meta : registerMetaSet()) { + + String directory = String.format("/jupiter/provider/%s/%s/%s", + meta.getGroup(), + meta.getServiceProviderName(), + meta.getVersion()); + + String nodePath = String.format("%s/%s:%s:%s:%s", + directory, + meta.getHost(), + String.valueOf(meta.getPort()), + String.valueOf(meta.getWeight()), + String.valueOf(meta.getConnCount())); + + try { + if (configClient.checkExists().forPath(nodePath) == null) { + super.register(meta); + } + } catch (Exception e) { + if (logger.isWarnEnabled()) { + logger.warn("CheckRegisterStatus register meta: {} path failed, {}.", meta, stackTrace(e)); + } + } + } + } + @Override public void connectToRegistryServer(String connectString) { checkNotNull(connectString, "connectString");