diff --git a/pom.xml b/pom.xml
index 98ecb01261..3de3bf0d9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
registry
balance
consul
+ portal-registry
zookeeper
cors
cluster
diff --git a/portal-registry/pom.xml b/portal-registry/pom.xml
new file mode 100644
index 0000000000..a7896843b9
--- /dev/null
+++ b/portal-registry/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+ 4.0.0
+
+
+ com.networknt
+ light-4j
+ 2.0.21-SNAPSHOT
+ ..
+
+
+ portal-registry
+ jar
+ A registry and discovery client that integrates with light-portal
+
+
+
+ com.networknt
+ config
+
+
+ com.networknt
+ utility
+
+
+ com.networknt
+ http-string
+
+
+ com.networknt
+ registry
+
+
+ com.networknt
+ client
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
diff --git a/portal-registry/src/integration/java/com/networknt/portal/registry/PortalRegistryTestIT.java b/portal-registry/src/integration/java/com/networknt/portal/registry/PortalRegistryTestIT.java
new file mode 100644
index 0000000000..f1577e2ec0
--- /dev/null
+++ b/portal-registry/src/integration/java/com/networknt/portal/registry/PortalRegistryTestIT.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2016 Network New Technologies Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.networknt.portal.registry;
+
+import com.networknt.registry.Registry;
+import com.networknt.registry.URL;
+import com.networknt.registry.URLImpl;
+import com.networknt.service.SingletonServiceFactory;
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PortalRegistryTestIT {
+ private PortalRegistry registry;
+ private URL serviceUrl;
+ private long sleepTime;
+
+ @Before
+ public void setUp() throws Exception {
+ registry = (PortalRegistry)SingletonServiceFactory.getBean(Registry.class);
+
+
+ serviceUrl = getMockUrl("http", "192.168.1.119",8083, "MockService");
+
+ sleepTime = PortalRegistryConstants.SWITCHER_CHECK_CIRCLE + 500;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ registry = null;
+ }
+
+ @Test
+ @Ignore
+ public void doRegisterAndAvailable() throws Exception {
+ // register
+ registry.doRegister(serviceUrl);
+
+ // unregister
+ registry.doUnregister(serviceUrl);
+ }
+
+ @Test
+ @Ignore
+ public void discoverService() throws Exception {
+ registry.doRegister(serviceUrl);
+ List urls = registry.discoverService(serviceUrl);
+ Assert.assertEquals(urls.size(), 0);
+ Thread.sleep(sleepTime);
+ urls = registry.discoverService(serviceUrl);
+ //Assert.assertTrue(urls.contains(serviceUrl));
+ }
+
+ public static URL getMockUrl(String protocol, String address, int port, String serviceName) {
+ Map params = new HashMap<>();
+ params.put("environment", "test1");
+ URL url = new URLImpl(protocol, address, port, serviceName, params);
+ return url;
+ }
+}
diff --git a/portal-registry/src/integration/resources/config/service.yml b/portal-registry/src/integration/resources/config/service.yml
new file mode 100644
index 0000000000..0dc47759ad
--- /dev/null
+++ b/portal-registry/src/integration/resources/config/service.yml
@@ -0,0 +1,17 @@
+singletons:
+- com.networknt.registry.URL:
+ - com.networknt.registry.URLImpl:
+ protocol: light
+ host: localhost
+ port: 8080
+ path: consul
+ parameters:
+ registryRetryPeriod: '30000'
+- com.networknt.portal.registry.client.PortalRegistryClient:
+ - com.networknt.portal.registry.client.PortalRegistryClientImpl
+- com.networknt.registry.Registry:
+ - com.networknt.portal.registry.PortalRegistry
+- com.networknt.balance.LoadBalance:
+ - com.networknt.balance.RoundRobinLoadBalance
+- com.networknt.cluster.Cluster:
+ - com.networknt.cluster.LightCluster
diff --git a/portal-registry/src/integration/resources/logback-test.xml b/portal-registry/src/integration/resources/logback-test.xml
new file mode 100644
index 0000000000..115515fc10
--- /dev/null
+++ b/portal-registry/src/integration/resources/logback-test.xml
@@ -0,0 +1,49 @@
+
+
+
+
+
+ PROFILER
+
+ NEUTRAL
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5marker %-5level %logger{36} - %msg%n
+
+
+
+
+ target/test.log
+ false
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %class{36}:%L %M - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistry.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistry.java
new file mode 100644
index 0000000000..69589a881a
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistry.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2009-2016 Weibo, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.networknt.portal.registry;
+
+import com.networknt.config.Config;
+import com.networknt.portal.registry.client.PortalRegistryClient;
+import com.networknt.registry.NotifyListener;
+import com.networknt.registry.URL;
+import com.networknt.registry.URLParamType;
+import com.networknt.registry.support.command.CommandFailbackRegistry;
+import com.networknt.registry.support.command.CommandServiceManager;
+import com.networknt.registry.support.command.ServiceListener;
+import com.networknt.utility.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class PortalRegistry extends CommandFailbackRegistry {
+ private static final Logger logger = LoggerFactory.getLogger(PortalRegistry.class);
+ private static final String CONFIG_PROPERTY_MISSING = "ERR10057";
+
+ private PortalRegistryClient client;
+ private PortalRegistryHeartbeatManager heartbeatManager;
+ private int lookupInterval;
+
+ // service local cache. key: serviceName, value:
+ private ConcurrentHashMap> serviceCache = new ConcurrentHashMap>();
+
+ // record lookup service thread, ensure each serviceName start only one thread,
+ private ConcurrentHashMap lookupServices = new ConcurrentHashMap();
+
+ // TODO: 2016/6/17 clientUrl support multiple listener
+ // record subscribers service callback listeners, listener was called when corresponding service changes
+ private ConcurrentHashMap> serviceListeners = new ConcurrentHashMap>();
+ private ThreadPoolExecutor notifyExecutor;
+
+ public PortalRegistry(URL url, PortalRegistryClient client) {
+ super(url);
+ this.client = client;
+ if(getPortalRegistryConfig().ttlCheck) {
+ heartbeatManager = new PortalRegistryHeartbeatManager(client, getPortalToken());
+ heartbeatManager.start();
+ }
+ lookupInterval = getUrl().getIntParameter(URLParamType.registrySessionTimeout.getName(), PortalRegistryConstants.DEFAULT_LOOKUP_INTERVAL);
+
+ ArrayBlockingQueue workQueue = new ArrayBlockingQueue(20000);
+ notifyExecutor = new ThreadPoolExecutor(10, 30, 30 * 1000, TimeUnit.MILLISECONDS, workQueue);
+ logger.info("ConsulRegistry init finish.");
+ }
+
+ public ConcurrentHashMap> getServiceListeners() {
+ return serviceListeners;
+ }
+
+ @Override
+ protected void doRegister(URL url) {
+ PortalRegistryService service = PortalRegistryUtils.buildService(url);
+ client.registerService(service, getPortalToken());
+ if(getPortalRegistryConfig().ttlCheck) heartbeatManager.addHeartbeatService(service);
+ }
+
+ @Override
+ protected void doUnregister(URL url) {
+ PortalRegistryService service = PortalRegistryUtils.buildService(url);
+ client.unregisterService(service, getPortalToken());
+ if(getPortalRegistryConfig().ttlCheck) heartbeatManager.removeHeartbeatService(service);
+ }
+
+ @Override
+ protected void doAvailable(URL url) {
+ if (url == null) {
+ if(getPortalRegistryConfig().ttlCheck) heartbeatManager.setHeartbeatOpen(true);
+ } else {
+ throw new UnsupportedOperationException("Command consul registry not support available by urls yet");
+ }
+ }
+
+ @Override
+ protected void doUnavailable(URL url) {
+ if (url == null) {
+ if(getPortalRegistryConfig().ttlCheck) heartbeatManager.setHeartbeatOpen(false);
+ } else {
+ throw new UnsupportedOperationException("Command consul registry not support unavailable by urls yet");
+ }
+ }
+
+ @Override
+ protected void subscribeService(URL url, ServiceListener serviceListener) {
+ addServiceListener(url, serviceListener);
+ startListenerThreadIfNewService(url);
+ }
+
+ /**
+ * Override the method in com.networknt.registry.support.commandCommandFailbackRegistry
+ * to skip calling the com.networknt.registry.support.commandCommandFailbackRegistry#doDiscover()
and
+ * com.networknt.registry.support.commandCommandFailbackRegistry#notify()
+ * @param url The subscribed service URL
+ * @param listener The listener to be notified when service registration changed.
+ */
+ @Override
+ protected void doSubscribe(URL url, final NotifyListener listener) {
+ if(logger.isInfoEnabled()) logger.info("CommandFailbackRegistry subscribe. url: " + url.toSimpleString());
+ URL urlCopy = url.createCopy();
+ CommandServiceManager manager = getCommandServiceManager(urlCopy);
+ manager.addNotifyListener(listener);
+
+ subscribeService(urlCopy, manager);
+ }
+
+ /**
+ * if new service registered, start a new lookup thread
+ * each serviceName start a lookup thread to discover service
+ *
+ * @param url
+ */
+ private void startListenerThreadIfNewService(URL url) {
+ String serviceId = url.getPath();
+ String tag = url.getParameter(Constants.TAG_ENVIRONMENT);
+ String key = tag == null ? serviceId : serviceId + "|" + tag;
+ String protocol = url.getProtocol();
+ if (!lookupServices.containsKey(key)) {
+ Long value = lookupServices.putIfAbsent(key, 0L);
+ if (value == null) {
+ ServiceLookupThread lookupThread = new ServiceLookupThread(protocol, serviceId, tag);
+ lookupThread.setDaemon(true);
+ lookupThread.start();
+ }
+ }
+ }
+
+ private void addServiceListener(URL url, ServiceListener serviceListener) {
+ String service = PortalRegistryUtils.getUrlClusterInfo(url);
+ ConcurrentHashMap map = serviceListeners.get(service);
+ if (map == null) {
+ serviceListeners.putIfAbsent(service, new ConcurrentHashMap());
+ map = serviceListeners.get(service);
+ }
+ synchronized (map) {
+ map.put(url, serviceListener);
+ }
+ }
+
+ @Override
+ protected void unsubscribeService(URL url, ServiceListener listener) {
+ ConcurrentHashMap listeners = serviceListeners.get(PortalRegistryUtils.getUrlClusterInfo(url));
+ if (listeners != null) {
+ synchronized (listeners) {
+ listeners.remove(url);
+ }
+ }
+ }
+
+ @Override
+ protected List discoverService(URL url) {
+ String serviceId = url.getPath();
+ String tag = url.getParameter(Constants.TAG_ENVIRONMENT);
+ String key = tag == null ? serviceId : serviceId + "|" + tag;
+ String protocol = url.getProtocol();
+ if(logger.isTraceEnabled()) logger.trace("protocol = " + protocol + " serviceId = " + serviceId + " tag = " + tag);
+ List urls = serviceCache.get(key);
+ if (urls == null || urls .isEmpty()) {
+ synchronized (key.intern()) {
+ urls = serviceCache.get(key);
+ if (urls == null || urls .isEmpty()) {
+ ConcurrentHashMap> serviceUrls = lookupServiceUpdate(protocol, serviceId, tag);
+ updateServiceCache(serviceId, serviceUrls, false);
+ urls = serviceCache.get(key);
+ }
+ }
+ }
+ return urls;
+ }
+
+ private ConcurrentHashMap> lookupServiceUpdate(String protocol, String serviceId, String tag) {
+ if(logger.isTraceEnabled()) logger.trace("protocol = " + protocol + " serviceId = " + serviceId + " tag = " + tag);
+ String key = tag == null ? serviceId : serviceId + "|" + tag;
+ PortalRegistryResponse> response = lookupConsulService(serviceId, tag);
+ if(logger.isTraceEnabled()) {
+ try {
+ logger.trace("response = " + Config.getInstance().getMapper().writeValueAsString(response));
+ } catch (Exception e) {}
+ }
+ ConcurrentHashMap> serviceUrls = new ConcurrentHashMap<>();
+ if (response != null) {
+ List services = response.getValue();
+ if(logger.isDebugEnabled()) try {logger.debug("services = " + Config.getInstance().getMapper().writeValueAsString(services));} catch (Exception e) {}
+ if (services != null && !services.isEmpty()) {
+ for (PortalRegistryService service : services) {
+ try {
+ URL url = PortalRegistryUtils.buildUrl(protocol, service);
+ List urlList = serviceUrls.get(key);
+ if (urlList == null) {
+ urlList = new ArrayList<>();
+ serviceUrls.put(key, urlList);
+ }
+ if(logger.isTraceEnabled()) logger.trace("lookupServiceUpdate url = " + url);
+ urlList.add(url);
+ } catch (Exception e) {
+ logger.error("convert portal registry service to url fail! service:" + service, e);
+ }
+ }
+ return serviceUrls;
+ } else {
+ logger.info(key + " no need update with empty service list");
+ }
+ } else {
+ serviceUrls.put(key, new ArrayList<>());
+ logger.info("no response for service: {}, set urls to empty list", key);
+ }
+ return serviceUrls;
+ }
+
+ /**
+ * directly fetch consul service data.
+ *
+ * @param serviceId service Id
+ * @return PortalRegistryResponse or null
+ */
+ private PortalRegistryResponse> lookupConsulService(String serviceId, String tag) {
+ PortalRegistryResponse> response = client.lookupHealthService(serviceId, tag, getPortalToken());
+ return response;
+ }
+
+ /**
+ * update service cache of the serviceName.
+ * update local cache when service list changed,
+ * if need notify, notify service
+ *
+ * @param serviceName
+ * @param serviceUrls
+ * @param needNotify
+ */
+ private void updateServiceCache(String serviceName, ConcurrentHashMap> serviceUrls, boolean needNotify) {
+ if (serviceUrls != null && !serviceUrls.isEmpty()) {
+ List cachedUrls = serviceCache.get(serviceName);
+ List newUrls = serviceUrls.get(serviceName);
+ try {
+ logger.trace("serviceUrls = {}", Config.getInstance().getMapper().writeValueAsString(serviceUrls));
+ } catch(Exception e) {
+ }
+ boolean change = true;
+ if (PortalRegistryUtils.isSame(newUrls, cachedUrls)) {
+ change = false;
+ } else {
+ serviceCache.put(serviceName, newUrls);
+ }
+ if (change && needNotify) {
+ notifyExecutor.execute(new NotifyService(serviceName, newUrls));
+ logger.info("light service notify-service: " + serviceName);
+ StringBuilder sb = new StringBuilder();
+ for (URL url : newUrls) {
+ sb.append(url.getUri()).append(";");
+ }
+ logger.info("consul notify urls:" + sb.toString());
+ }
+ }
+ }
+
+ private class ServiceLookupThread extends Thread {
+ private String protocol;
+ private String serviceId;
+ private String tag;
+ private String key;
+
+ public ServiceLookupThread(String protocol, String serviceId, String tag) {
+ this.protocol = protocol;
+ this.serviceId = serviceId;
+ this.tag = tag;
+ this.key = this.tag == null ? this.serviceId : this.serviceId + "|" + tag;
+ }
+
+ @Override
+ public void run() {
+ logger.info("start service lookup thread. lookup interval: " + lookupInterval + "ms, service: " + serviceId);
+ while (true) {
+ try {
+ sleep(lookupInterval);
+ ConcurrentHashMap> serviceUrls = lookupServiceUpdate(protocol, serviceId, tag);
+ updateServiceCache(key, serviceUrls, true);
+ } catch (Throwable e) {
+ logger.error("service lookup thread fail!", e);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+ }
+
+ private class NotifyService implements Runnable {
+ private String service;
+ private List urls;
+
+ public NotifyService(String service, List urls) {
+ this.service = service;
+ this.urls = urls;
+ }
+
+ @Override
+ public void run() {
+ ConcurrentHashMap listeners = serviceListeners.get(service);
+ if (listeners != null) {
+ synchronized (listeners) {
+ for (Map.Entry entry : listeners.entrySet()) {
+ ServiceListener serviceListener = entry.getValue();
+ serviceListener.notifyService(entry.getKey(), getUrl(), urls);
+ }
+ }
+ } else {
+ logger.debug("need not notify service:" + service);
+ }
+ }
+ }
+
+ private PortalRegistryConfig getPortalRegistryConfig(){
+ return (PortalRegistryConfig)Config.getInstance().getJsonObjectConfig(PortalRegistryConfig.CONFIG_NAME, PortalRegistryConfig.class);
+ }
+
+ private String getPortalToken() {
+
+ return null;
+ }
+
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryConfig.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryConfig.java
new file mode 100644
index 0000000000..a50fe0f7d4
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2016 Network New Technologies Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.networknt.portal.registry;
+
+public class PortalRegistryConfig {
+ public static final String CONFIG_NAME = "portal-registry";
+
+ String portalUrl;
+ int maxReqPerConn;
+ String deregisterAfter;
+ String checkInterval;
+ boolean httpCheck;
+ boolean ttlCheck;
+
+ public String getPortalUrl() {
+ return portalUrl;
+ }
+
+ public void setPortalUrl(String portalUrl) {
+ this.portalUrl = portalUrl;
+ }
+
+ public int getMaxReqPerConn() {
+ return maxReqPerConn;
+ }
+
+ public void setMaxReqPerConn(int maxReqPerConn) {
+ this.maxReqPerConn = maxReqPerConn;
+ }
+
+ public String getDeregisterAfter() {
+ return deregisterAfter;
+ }
+
+ public void setDeregisterAfter(String deregisterAfter) {
+ this.deregisterAfter = deregisterAfter;
+ }
+
+ public String getCheckInterval() {
+ return checkInterval;
+ }
+
+ public void setCheckInterval(String checkInterval) {
+ this.checkInterval = checkInterval;
+ }
+
+ public boolean isHttpCheck() {
+ return httpCheck;
+ }
+
+ public void setHttpCheck(boolean httpCheck) {
+ this.httpCheck = httpCheck;
+ }
+
+ public boolean isTtlCheck() {
+ return ttlCheck;
+ }
+
+ public void setTtlCheck(boolean ttlCheck) {
+ this.ttlCheck = ttlCheck;
+ }
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryConstants.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryConstants.java
new file mode 100644
index 0000000000..1ce22d0dae
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryConstants.java
@@ -0,0 +1,34 @@
+package com.networknt.portal.registry;
+
+public class PortalRegistryConstants {
+ /**
+ * Service Check Interval
+ */
+ public static String INTERVAL = PortalRegistryService.config.checkInterval == null ? "10s" : PortalRegistryService.config.checkInterval;
+
+ /**
+ * Service Time To Live in second. If there is no heart beat with TTL, the service
+ * will be marked as unavailable.
+ */
+ public static int TTL = Integer.valueOf(INTERVAL.substring(0,INTERVAL.length() - 1));
+
+ /**
+ * Heart beat circle,2/3 of ttl
+ */
+ public static int HEARTBEAT_CIRCLE = (TTL * 1000 * 2) / 3;
+
+ /**
+ * Maximum continuous switch checks, send heart beat is this number is exceeded.
+ */
+ public static int MAX_SWITCHER_CHECK_TIMES = 10;
+
+ /**
+ * Switcher change rate
+ */
+ public static int SWITCHER_CHECK_CIRCLE = HEARTBEAT_CIRCLE / MAX_SWITCHER_CHECK_TIMES;
+
+ /**
+ * portal controller service lookup interval in millisecond
+ */
+ public static int DEFAULT_LOOKUP_INTERVAL = 30000;
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryHeartbeatManager.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryHeartbeatManager.java
new file mode 100644
index 0000000000..427119b3be
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryHeartbeatManager.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009-2016 Weibo, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.networknt.portal.registry;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.networknt.portal.registry.client.PortalRegistryClient;
+import com.networknt.utility.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * consul heart beat manager. passing status service id is registered here,
+ * and this class will set passing status for serviceId(in fact it is corresponding checkId of serviceId),
+ * then the heart beat process is done.
+ *
+ * Switcher is used to enable heart beat or disable heart beat.
+ *
+ * @author zhanglei
+ *
+ */
+public class PortalRegistryHeartbeatManager {
+ private static final Logger logger = LoggerFactory.getLogger(PortalRegistryHeartbeatManager.class);
+ private PortalRegistryClient client;
+ private String token;
+ // all serviceIds that need heart beats.
+ private ConcurrentHashSet services = new ConcurrentHashSet<>();
+
+ private ThreadPoolExecutor jobExecutor;
+ private ScheduledExecutorService heartbeatExecutor;
+ // last heart beat switcher status
+ private boolean lastHeartBeatSwitcherStatus = false;
+ private volatile boolean currentHeartBeatSwitcherStatus = false;
+ // switcher check times
+ private int switcherCheckTimes = 0;
+
+ public PortalRegistryHeartbeatManager(PortalRegistryClient client, String token) {
+ this.client = client;
+ this.token = token;
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+ ArrayBlockingQueue workQueue = new ArrayBlockingQueue(
+ 10000);
+ jobExecutor = new ThreadPoolExecutor(5, 30, 30 * 1000,
+ TimeUnit.MILLISECONDS, workQueue);
+ }
+
+ public void start() {
+ heartbeatExecutor.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ // Because consul check set pass triggers consul
+ // server write operation,frequently heart beat will impact consul
+ // performance,so heart beat takes long cycle and switcher check takes short cycle.
+ // multiple check on switcher and then send one heart beat to consul server.
+ // TODO change to switcher listener approach.
+ try {
+ boolean switcherStatus = isHeartbeatOpen();
+ if (isSwitcherChange(switcherStatus)) { // heart beat switcher status changed
+ processHeartbeat(switcherStatus);
+ } else {// heart beat switcher status not changed.
+ if (switcherStatus) {// switcher is on, check MAX_SWITCHER_CHECK_TIMES and then send a heart beat
+ switcherCheckTimes++;
+ if (switcherCheckTimes >= PortalRegistryConstants.MAX_SWITCHER_CHECK_TIMES) {
+ processHeartbeat(true);
+ switcherCheckTimes = 0;
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("consul heartbeat executor err:",
+ e);
+ }
+ }
+ }, PortalRegistryConstants.SWITCHER_CHECK_CIRCLE,
+ PortalRegistryConstants.SWITCHER_CHECK_CIRCLE, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * check heart beat switcher status, if switcher is changed, then change lastHeartBeatSwitcherStatus
+ * to the latest status.
+ *
+ * @param switcherStatus
+ * @return
+ */
+ private boolean isSwitcherChange(boolean switcherStatus) {
+ boolean ret = false;
+ if (switcherStatus != lastHeartBeatSwitcherStatus) {
+ ret = true;
+ lastHeartBeatSwitcherStatus = switcherStatus;
+ logger.info("heartbeat switcher change to " + switcherStatus);
+ }
+ return ret;
+ }
+
+ protected void processHeartbeat(boolean isPass) {
+ for (PortalRegistryService service : services) {
+ try {
+ jobExecutor.execute(new HeartbeatJob(service, isPass));
+ } catch (RejectedExecutionException ree) {
+ logger.error("execute heartbeat job fail! serviceId:"
+ + service.getServiceId() + " is rejected");
+ }
+ }
+ }
+
+ public void close() {
+ heartbeatExecutor.shutdown();
+ jobExecutor.shutdown();
+ logger.info("Consul heartbeatManager closed.");
+ }
+
+ /**
+ * Add consul serviceId,added serviceId will set passing status to keep sending heart beat.
+ *
+ * @param service PortalRegistryService
+ */
+ public void addHeartbeatService(PortalRegistryService service) {
+ services.add(service);
+ }
+
+ /**
+ * remove service,corresponding service won't send heart beat
+ *
+ * @param service PortalRegistryService
+ */
+ public void removeHeartbeatService(PortalRegistryService service) {
+ services.remove(service);
+ }
+
+ // check if heart beat switcher is on
+ private boolean isHeartbeatOpen() {
+ return currentHeartBeatSwitcherStatus;
+ }
+
+ public void setHeartbeatOpen(boolean open) {
+ currentHeartBeatSwitcherStatus = open;
+ }
+
+ class HeartbeatJob implements Runnable {
+ private PortalRegistryService service;
+ private boolean isPass;
+
+ public HeartbeatJob(PortalRegistryService service, boolean isPass) {
+ super();
+ this.service = service;
+ this.isPass = isPass;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (isPass) {
+ client.checkPass(service, token);
+ } else {
+ client.checkFail(service, token);
+ }
+ } catch (Exception e) {
+ logger.error(
+ "portal controller heartbeat-set check pass error!serviceId:"
+ + service.getServiceId(), e);
+ }
+
+ }
+
+ }
+
+ public void setClient(PortalRegistryClient client) {
+ this.client = client;
+ }
+
+
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryResponse.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryResponse.java
new file mode 100644
index 0000000000..81014fa3a9
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2016 Weibo, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.networknt.portal.registry;
+
+public class PortalRegistryResponse {
+ /**
+ * portal controller return result
+ */
+ private T value;
+
+ private Long consulIndex;
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+
+ public Long getConsulIndex() {
+ return consulIndex;
+ }
+
+ public void setConsulIndex(Long consulIndex) {
+ this.consulIndex = consulIndex;
+ }
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryService.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryService.java
new file mode 100644
index 0000000000..3d783100a7
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryService.java
@@ -0,0 +1,97 @@
+package com.networknt.portal.registry;
+
+import com.networknt.config.Config;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.networknt.portal.registry.PortalRegistryConfig.CONFIG_NAME;
+
+public class PortalRegistryService {
+ static PortalRegistryConfig config = (PortalRegistryConfig) Config.getInstance().getJsonObjectConfig(CONFIG_NAME, PortalRegistryConfig.class);
+
+ private String serviceId;
+
+ private String name;
+
+ private String tag;
+
+ private String address;
+
+ private int port;
+
+ private String checkString;
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public void setServiceId(String serviceId) {
+ this.serviceId = serviceId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getCheckString() {
+ return checkString;
+ }
+
+ public void setCheckString(String checkString) {
+ this.checkString = checkString;
+ }
+
+ public PortalRegistryService() {
+ if(config.httpCheck) {
+ checkString = ",\"check\":{\"id\":\"%1$s:%2$s:%3$s\",\"deregisterCriticalServiceAfter\":\"" + config.deregisterAfter + "\",\"http\":\"" + "https://%2$s:%3$s/health/%1$s" + "\",\"tlsSkipVerify\":true,\"interval\":\"" + config.checkInterval + "\"}}";
+ } else {
+ checkString = ",\"check\":{\"id\":\"%1$s:%2$s:%3$s\",\"deregisterCriticalServiceAfter\":\"" + config.deregisterAfter + "\",\"ttl\":\"" + config.checkInterval + "\"}}";
+ }
+ }
+
+ /**
+ * Construct a register json payload. Note that deregister internal minimum is 1m.
+ *
+ * @return String
+ */
+ @Override
+ public String toString() {
+ String key = tag == null ? serviceId : serviceId + "|" + tag;
+ return "{\"serviceId\":\"" + serviceId +
+ "\",\"name\":\"" + name
+ + (tag != null ? "\",\"tag\":\"" + tag : "")
+ + "\",\"address\":\"" + address
+ + "\",\"port\":" + port
+ + String.format(checkString, key, address, port);
+ }
+
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryUtils.java b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryUtils.java
new file mode 100644
index 0000000000..e76fab7f69
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/PortalRegistryUtils.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2009-2016 Weibo, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.networknt.portal.registry;
+
+import com.networknt.registry.URL;
+import com.networknt.registry.URLImpl;
+import com.networknt.registry.URLParamType;
+import com.networknt.utility.Constants;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PortalRegistryUtils {
+
+ /**
+ * Check if two lists have the same urls.
+ *
+ * @param urls1 first url list
+ * @param urls2 second url list
+ * @return boolean true when they are the same
+ */
+ public static boolean isSame(List urls1, List urls2) {
+ if(urls1 == null && urls2 == null) {
+ return true;
+ }
+ if (urls1 == null || urls2 == null) {
+ return false;
+ }
+ if (urls1.size() != urls2.size()) {
+ return false;
+ }
+ return urls1.containsAll(urls2);
+ }
+
+ /**
+ * build consul service from url
+ *
+ * @param url a URL object
+ * @return service PortalRegistryService
+ */
+ public static PortalRegistryService buildService(URL url) {
+ PortalRegistryService service = new PortalRegistryService();
+ service.setAddress(url.getHost());
+ service.setServiceId(convertPortalRegistrySerivceId(url));
+ service.setName(url.getPath());
+ service.setPort(url.getPort());
+ String env = url.getParameter(Constants.TAG_ENVIRONMENT);
+ if(env != null) service.setTag(env);
+ return service;
+ }
+
+ /**
+ * build url from service
+ * @param protocol the protocol of the service
+ * @param service PortalRegistryService
+ * @return URL object
+ */
+ public static URL buildUrl(String protocol, PortalRegistryService service) {
+ URL url = null;
+ if (url == null) {
+ Map params = new HashMap<>();
+ if (service.getTag() != null) {
+ params.put(URLParamType.environment.getName(), service.getTag());
+ }
+ url = new URLImpl(protocol, service.getAddress(), service.getPort(), service.getServiceId(), params);
+ }
+ return url;
+ }
+
+ /**
+ * get cluster info from url, cluster info (protocol, path)
+ *
+ * @param url a URL object
+ * @return String url cluster info
+ */
+ public static String getUrlClusterInfo(URL url) {
+ return url.getPath();
+ }
+
+ /**
+ * convert url to consul service id. serviceid includes ip+port+service
+ *
+ * @param url a URL object
+ * @return service id
+ */
+ public static String convertPortalRegistrySerivceId(URL url) {
+ return url.getPath();
+ }
+
+ /**
+ * get path of url from service id in consul
+ *
+ * @param serviceId service id
+ * @return path
+ */
+ public static String getPathFromServiceId(String serviceId) {
+ return serviceId.substring(serviceId.indexOf(":") + 1, serviceId.lastIndexOf(":"));
+ }
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/client/PortalRegistryClient.java b/portal-registry/src/main/java/com/networknt/portal/registry/client/PortalRegistryClient.java
new file mode 100644
index 0000000000..0657c399ef
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/client/PortalRegistryClient.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2020 Network New Technologies Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.networknt.portal.registry.client;
+
+import java.util.List;
+
+import com.networknt.portal.registry.PortalRegistry;
+import com.networknt.portal.registry.PortalRegistryResponse;
+import com.networknt.portal.registry.PortalRegistryService;
+
+public interface PortalRegistryClient {
+
+ /**
+ * Set specific serviceId status as pass
+ *
+ * @param service PortalRegistryService
+ * @param token ACL token for consul
+ */
+ void checkPass(PortalRegistryService service, String token);
+
+ /**
+ * Set specific serviceId status as fail
+ *
+ * @param service PortalRegistryService
+ * @param token ACL token for consul
+ */
+ void checkFail(PortalRegistryService service, String token);
+
+ /**
+ * register a consul service
+ *
+ * @param service service object
+ * @param token A bootstrap JWT token to access portal controller
+ */
+ void registerService(PortalRegistryService service, String token);
+
+ /**
+ * unregister a consul service
+ *
+ * @param service service object
+ * @param token bootstrap JWT token to access portal controller
+ */
+ void unregisterService(PortalRegistryService service, String token);
+
+ /**
+ * get latest service list with a tag filter and a security token
+ *
+ * @param serviceId serviceId
+ * @param tag tag that is used for filtering
+ * @param token consul token for security
+ * @return T
+ */
+ PortalRegistryResponse> lookupHealthService(String serviceId, String tag, String token);
+
+}
diff --git a/portal-registry/src/main/java/com/networknt/portal/registry/client/PortalRegistryClientImpl.java b/portal-registry/src/main/java/com/networknt/portal/registry/client/PortalRegistryClientImpl.java
new file mode 100644
index 0000000000..146836d7f1
--- /dev/null
+++ b/portal-registry/src/main/java/com/networknt/portal/registry/client/PortalRegistryClientImpl.java
@@ -0,0 +1,239 @@
+package com.networknt.portal.registry.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.networknt.client.Http2Client;
+import com.networknt.config.Config;
+import com.networknt.config.JsonMapper;
+import com.networknt.httpstring.HttpStringConstants;
+import com.networknt.portal.registry.PortalRegistry;
+import com.networknt.portal.registry.PortalRegistryConfig;
+import com.networknt.portal.registry.PortalRegistryResponse;
+import com.networknt.portal.registry.PortalRegistryService;
+import com.networknt.utility.StringUtils;
+import io.undertow.UndertowOptions;
+import io.undertow.client.ClientConnection;
+import io.undertow.client.ClientRequest;
+import io.undertow.client.ClientResponse;
+import io.undertow.util.Headers;
+import io.undertow.util.HttpString;
+import io.undertow.util.Methods;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xnio.OptionMap;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.networknt.portal.registry.PortalRegistryConfig.CONFIG_NAME;
+
+public class PortalRegistryClientImpl implements PortalRegistryClient {
+ private static final Logger logger = LoggerFactory.getLogger(PortalRegistryClientImpl.class);
+ private static final PortalRegistryConfig config = (PortalRegistryConfig) Config.getInstance().getJsonObjectConfig(CONFIG_NAME, PortalRegistryConfig.class);
+ private static final int UNUSUAL_STATUS_CODE = 300;
+ private Http2Client client = Http2Client.getInstance();
+
+ private OptionMap optionMap;
+ private URI uri;
+
+ /**
+ * Construct PortalRegistryClient with all parameters from portal-registry.yml config file. The other two constructors are
+ * just for backward compatibility.
+ */
+ public PortalRegistryClientImpl() {
+ String portalUrl = config.getPortalUrl().toLowerCase();
+ optionMap = OptionMap.create(UndertowOptions.ENABLE_HTTP2, true);
+ logger.debug("url = {}", portalUrl);
+ try {
+ uri = new URI(portalUrl);
+ } catch (URISyntaxException e) {
+ logger.error("Invalid URI " + portalUrl, e);
+ throw new RuntimeException("Invalid URI " + portalUrl, e);
+ }
+ }
+
+ @Override
+ public void checkPass(PortalRegistryService service, String token) {
+ String key = service.getTag() == null ? service.getServiceId() : service.getServiceId() + "|" + service.getTag();
+ String checkId = String.format("%s:%s:%s", key, service.getAddress(), service.getPort());
+ if(logger.isTraceEnabled()) logger.trace("checkPass id = {}", checkId);
+ Map map = new HashMap<>();
+ map.put("id", checkId);
+ map.put("pass", true);
+ String path = "/services/check";
+ ClientConnection connection = null;
+ try {
+ connection = client.borrowConnection(uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap).get();
+ AtomicReference reference = send(connection, Methods.PUT, path, token, JsonMapper.toJson(map));
+ int statusCode = reference.get().getResponseCode();
+ if (statusCode >= UNUSUAL_STATUS_CODE) {
+ logger.error("checkPass error: {} : {}", statusCode, reference.get().getAttachment(Http2Client.RESPONSE_BODY));
+ }
+ } catch (Exception e) {
+ logger.error("CheckPass request exception", e);
+ } finally {
+ client.returnConnection(connection);
+ }
+ }
+
+ @Override
+ public void checkFail(PortalRegistryService service, String token) {
+ String key = service.getTag() == null ? service.getServiceId() : service.getServiceId() + "|" + service.getTag();
+ String checkId = String.format("%s:%s:%s", key, service.getAddress(), service.getPort());
+ if(logger.isTraceEnabled()) logger.trace("checkFail id = {}", checkId);
+ Map map = new HashMap<>();
+ map.put("id", checkId);
+ map.put("pass", false);
+ String path = "/services/check";
+ ClientConnection connection = null;
+ try {
+ connection = client.borrowConnection(uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap).get();
+ AtomicReference reference = send(connection, Methods.PUT, path, token, JsonMapper.toJson(map));
+ int statusCode = reference.get().getResponseCode();
+ if (statusCode >= UNUSUAL_STATUS_CODE) {
+ logger.error("checkFail error: {} : {}", statusCode, reference.get().getAttachment(Http2Client.RESPONSE_BODY));
+ }
+ } catch (Exception e) {
+ logger.error("CheckFail request exception", e);
+ } finally {
+ client.returnConnection(connection);
+ }
+ }
+
+ @Override
+ public void registerService(PortalRegistryService service, String token) {
+ String json = service.toString();
+ String path = "/services";
+ ClientConnection connection = null;
+ try {
+ connection = client.borrowConnection(uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap).get();
+ AtomicReference reference = send(connection, Methods.POST, path, token, json);
+ int statusCode = reference.get().getResponseCode();
+ if (statusCode >= UNUSUAL_STATUS_CODE) {
+ logger.error("Failed to register on portal controller: {} : {}", statusCode, reference.get().getAttachment(Http2Client.RESPONSE_BODY));
+ throw new Exception("Failed to register on portal controller: " + statusCode);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to register on portal controller, Exception:", e);
+ throw new RuntimeException(e.getMessage());
+ } finally {
+ client.returnConnection(connection);
+ }
+ }
+
+ @Override
+ public void unregisterService(PortalRegistryService service, String token) {
+ String path = "/services?serviceId=" + service.getServiceId() + "&address=" + service.getAddress() + "&port=" + service.getPort();
+ if(service.getTag() != null) path = path + "&tag=" + service.getTag();
+ ClientConnection connection = null;
+ try {
+ connection = client.borrowConnection(uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap).get();
+ final AtomicReference reference = send(connection, Methods.DELETE, path, token, null);
+ int statusCode = reference.get().getResponseCode();
+ if (statusCode >= UNUSUAL_STATUS_CODE) {
+ logger.error("Failed to unregister on portal controller, body = {}", reference.get().getAttachment(Http2Client.RESPONSE_BODY));
+ // there is no reason to throw an exception here as the server is down.
+ }
+ } catch (Exception e) {
+ logger.error("Failed to unregister on portal controller, Exception:", e);
+ } finally {
+ client.returnConnection(connection);
+ }
+ }
+
+ /**
+ * to lookup health services based on serviceId and optional tag,
+ *
+ * @param serviceId serviceId
+ * @param tag tag that is used for filtering
+ * @param token jwt token for security
+ * @return null if serviceId is blank
+ */
+ @Override
+ public PortalRegistryResponse> lookupHealthService(String serviceId, String tag, String token) {
+
+ PortalRegistryResponse> newResponse = null;
+
+ if (StringUtils.isBlank(serviceId)) {
+ return null;
+ }
+ ClientConnection connection = null;
+ String path = "/services" + "?passing&serviceId=" + serviceId;
+ if (tag != null) {
+ path = path + "&tag=" + tag;
+ }
+ logger.trace("path = {}", path);
+ try {
+ connection = client.borrowConnection(uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap).get();
+ AtomicReference reference = send(connection, Methods.GET, path, token, null);
+ int statusCode = reference.get().getResponseCode();
+ if (statusCode >= UNUSUAL_STATUS_CODE) {
+ throw new Exception("Failed to unregister on Consul: " + statusCode);
+ } else {
+ String body = reference.get().getAttachment(Http2Client.RESPONSE_BODY);
+ List