From 6ca1148bf231c04b0a21d5268b2beb50e44c158b Mon Sep 17 00:00:00 2001 From: "qi.yin" Date: Wed, 18 Jan 2017 13:52:15 +0800 Subject: [PATCH 1/9] update pigeoncall requestsize --- .../filter/RemoteCallMonitorInvokeFilter.java | 232 +++++++++--------- 1 file changed, 118 insertions(+), 114 deletions(-) diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/RemoteCallMonitorInvokeFilter.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/RemoteCallMonitorInvokeFilter.java index 34d87524..d61bea31 100644 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/RemoteCallMonitorInvokeFilter.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/RemoteCallMonitorInvokeFilter.java @@ -28,125 +28,129 @@ public class RemoteCallMonitorInvokeFilter extends InvocationInvokeFilter { - private static final Logger logger = LoggerLoader.getLogger(RemoteCallMonitorInvokeFilter.class); + private static final Logger logger = LoggerLoader.getLogger(RemoteCallMonitorInvokeFilter.class); - private final Monitor monitor = MonitorLoader.getMonitor(); + private final Monitor monitor = MonitorLoader.getMonitor(); - public RemoteCallMonitorInvokeFilter() { - } + public RemoteCallMonitorInvokeFilter() { + } + + @Override + public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) + throws Throwable { + invocationContext.getTimeline().add(new TimePoint(TimePhase.O)); + MonitorTransaction transaction = null; + InvocationRequest request = invocationContext.getRequest(); + String targetApp = null; + String callInterface = null; + InvokerConfig invokerConfig = invocationContext.getInvokerConfig(); + if (monitor != null) { + try { + callInterface = InvocationUtils.getRemoteCallFullName(invokerConfig.getUrl(), + invocationContext.getMethodName(), invocationContext.getParameterTypes()); + transaction = monitor.createTransaction("PigeonCall", callInterface, invocationContext); + if (transaction != null) { + monitor.setCurrentCallTransaction(transaction); + transaction.setStatusOk(); + transaction.logEvent("PigeonCall.callType", + invokerConfig.getCallType(invocationContext.getMethodName()), ""); + transaction.logEvent("PigeonCall.serialize", invokerConfig.getSerialize() + "", ""); + transaction.logEvent("PigeonCall.timeout", + invokerConfig.getTimeout(invocationContext.getMethodName()) + "", ""); + transaction.logEvent("PigeonCall.QPS", "S" + Calendar.getInstance().get(Calendar.SECOND), ""); + } + } catch (Throwable e) { + monitor.logMonitorError(e); + } + } + boolean error = false; + try { + InvocationResponse response = handler.handle(invocationContext); + if (transaction != null) { + if (invocationContext.isDegraded()) { + transaction.logEvent("PigeonCall.degrade", callInterface, ""); + } + Client client = invocationContext.getClient(); + if (client != null) { + targetApp = RegistryManager.getInstance().getReferencedAppFromCache(client.getAddress()); + transaction.logEvent("PigeonCall.app", targetApp, ""); + String parameters = ""; + if (Constants.LOG_PARAMETERS) { + parameters = InvocationUtils.toJsonString(request.getParameters(), 1000, 50); + } + transaction.logEvent("PigeonCall.server", client.getAddress(), parameters); + } + + request = invocationContext.getRequest(); - @Override - public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) - throws Throwable { - invocationContext.getTimeline().add(new TimePoint(TimePhase.O)); - MonitorTransaction transaction = null; - InvocationRequest request = invocationContext.getRequest(); - String targetApp = null; - String callInterface = null; - InvokerConfig invokerConfig = invocationContext.getInvokerConfig(); - if (monitor != null) { - try { - callInterface = InvocationUtils.getRemoteCallFullName(invokerConfig.getUrl(), - invocationContext.getMethodName(), invocationContext.getParameterTypes()); - transaction = monitor.createTransaction("PigeonCall", callInterface, invocationContext); - if (transaction != null) { - monitor.setCurrentCallTransaction(transaction); - transaction.setStatusOk(); - transaction.logEvent("PigeonCall.callType", - invokerConfig.getCallType(invocationContext.getMethodName()), ""); - transaction.logEvent("PigeonCall.serialize", invokerConfig.getSerialize() + "", ""); - transaction.logEvent("PigeonCall.timeout", - invokerConfig.getTimeout(invocationContext.getMethodName()) + "", ""); - transaction.logEvent("PigeonCall.QPS", "S" + Calendar.getInstance().get(Calendar.SECOND), ""); - } - } catch (Throwable e) { - monitor.logMonitorError(e); - } - } - boolean error = false; - try { - InvocationResponse response = handler.handle(invocationContext); - if (transaction != null) { - if (invocationContext.isDegraded()) { - transaction.logEvent("PigeonCall.degrade", callInterface, ""); - } - Client client = invocationContext.getClient(); - if (client != null) { - targetApp = RegistryManager.getInstance().getReferencedAppFromCache(client.getAddress()); - transaction.logEvent("PigeonCall.app", targetApp, ""); - String parameters = ""; - if (Constants.LOG_PARAMETERS) { - parameters = InvocationUtils.toJsonString(request.getParameters(), 1000, 50); - } - transaction.logEvent("PigeonCall.server", client.getAddress(), parameters); - } - if (request != null) { - String reqSize = SizeMonitor.getInstance().getLogSize(request.getSize()); - if (reqSize != null) { - monitor.logEvent("PigeonCall.requestSize", reqSize, "" + request.getSize()); - } - } - if (response != null && response.getSize() > 0) { - String respSize = SizeMonitor.getInstance().getLogSize(response.getSize()); - if (respSize != null) { - monitor.logEvent("PigeonCall.responseSize", respSize, "" + response.getSize()); - } - invocationContext.getTimeline().add(new TimePoint(TimePhase.R, response.getCreateMillisTime())); - invocationContext.getTimeline().add(new TimePoint(TimePhase.R)); - } - } - return response; - } catch (Throwable e) { - if (transaction != null) { - if (invocationContext.isDegraded()) { - transaction.logEvent("PigeonCall.degrade", callInterface, ""); - } - Client client = invocationContext.getClient(); - String remoteAddress = null; - if (client != null) { - remoteAddress = client.getAddress(); - targetApp = RegistryManager.getInstance().getReferencedAppFromCache(remoteAddress); - transaction.logEvent("PigeonCall.app", targetApp, ""); - String parameters = ""; - if (request != null && Constants.LOG_PARAMETERS) { - parameters = InvocationUtils.toJsonString(request.getParameters(), 1000, 50); - } - transaction.logEvent("PigeonCall.server", client.getAddress(), parameters); - } - if (request != null) { - String reqSize = SizeMonitor.getInstance().getLogSize(request.getSize()); - if (reqSize != null) { - monitor.logEvent("PigeonCall.requestSize", reqSize, "" + request.getSize()); - } - } + if (request != null) { + String reqSize = SizeMonitor.getInstance().getLogSize(request.getSize()); + if (reqSize != null) { + monitor.logEvent("PigeonCall.requestSize", reqSize, "" + request.getSize()); + } + } + if (response != null && response.getSize() > 0) { + String respSize = SizeMonitor.getInstance().getLogSize(response.getSize()); + if (respSize != null) { + monitor.logEvent("PigeonCall.responseSize", respSize, "" + response.getSize()); + } + invocationContext.getTimeline().add(new TimePoint(TimePhase.R, response.getCreateMillisTime())); + invocationContext.getTimeline().add(new TimePoint(TimePhase.R)); + } + } + return response; + } catch (Throwable e) { + if (transaction != null) { + if (invocationContext.isDegraded()) { + transaction.logEvent("PigeonCall.degrade", callInterface, ""); + } + Client client = invocationContext.getClient(); + String remoteAddress = null; + if (client != null) { + remoteAddress = client.getAddress(); + targetApp = RegistryManager.getInstance().getReferencedAppFromCache(remoteAddress); + transaction.logEvent("PigeonCall.app", targetApp, ""); + String parameters = ""; + if (request != null && Constants.LOG_PARAMETERS) { + parameters = InvocationUtils.toJsonString(request.getParameters(), 1000, 50); + } + transaction.logEvent("PigeonCall.server", client.getAddress(), parameters); + } + request = invocationContext.getRequest(); + if (request != null) { + String reqSize = SizeMonitor.getInstance().getLogSize(request.getSize()); + if (reqSize != null) { + monitor.logEvent("PigeonCall.requestSize", reqSize, "" + request.getSize()); + } + } - ExceptionManager.INSTANCE.logRpcException(remoteAddress, invokerConfig.getUrl(), - invocationContext.getMethodName(), "", e, request, null, transaction); - } - throw e; - } finally { + ExceptionManager.INSTANCE.logRpcException(remoteAddress, invokerConfig.getUrl(), + invocationContext.getMethodName(), "", e, request, null, transaction); + } + throw e; + } finally { - if (transaction != null) { - try { - if (invocationContext.getRequest() != null) { - InvocationRequest _request = invocationContext.getRequest(); - InvokerConfig config = invocationContext.getInvokerConfig(); - if (_request.getSerialize() != config.getSerialize()) { - transaction.addData("CurrentSerialize", _request.getSerialize()); - } - monitorProtocal(invocationContext, _request, targetApp); - } - invocationContext.getTimeline().add(new TimePoint(TimePhase.E, System.currentTimeMillis())); - transaction.complete(); - } catch (Throwable e) { - monitor.logMonitorError(e); - } - if (monitor != null) { - monitor.clearCallTransaction(); - } - } - } - } + if (transaction != null) { + try { + if (invocationContext.getRequest() != null) { + InvocationRequest _request = invocationContext.getRequest(); + InvokerConfig config = invocationContext.getInvokerConfig(); + if (_request.getSerialize() != config.getSerialize()) { + transaction.addData("CurrentSerialize", _request.getSerialize()); + } + monitorProtocal(invocationContext, _request, targetApp); + } + invocationContext.getTimeline().add(new TimePoint(TimePhase.E, System.currentTimeMillis())); + transaction.complete(); + } catch (Throwable e) { + monitor.logMonitorError(e); + } + if (monitor != null) { + monitor.clearCallTransaction(); + } + } + } + } private void monitorProtocal(InvokerContext invokerContext, InvocationRequest request, String targetApp) { From bc4e39fa89ace30603c43c253cdbe724ca8a5f14 Mon Sep 17 00:00:00 2001 From: "qi.yin" Date: Tue, 24 Jan 2017 19:58:36 +0800 Subject: [PATCH 2/9] update config get --- .../common/domain/generic/ThriftMapper.java | 4 +- .../filter/ContextPrepareInvokeFilter.java | 492 +++++++++--------- .../RequestThreadPoolProcessor.java | 14 +- 3 files changed, 264 insertions(+), 246 deletions(-) diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/common/domain/generic/ThriftMapper.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/common/domain/generic/ThriftMapper.java index 9143ed99..6f70bd5e 100644 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/common/domain/generic/ThriftMapper.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/common/domain/generic/ThriftMapper.java @@ -25,6 +25,8 @@ */ public class ThriftMapper { + private static final String appName = ConfigManagerLoader.getConfigManager().getAppName(); + public static Header convertRequestToHeader(GenericRequest request) { Header header = new Header(); @@ -95,7 +97,7 @@ public static Header convertResponseToHeader(GenericResponse response) { header.setMessageType(MessageType.ScannerHeartbeat.getCode()); // 响应心跳信息 HeartbeatInfo heartbeatInfo = new HeartbeatInfo(); - heartbeatInfo.setAppkey(ConfigManagerLoader.getConfigManager().getAppName()); + heartbeatInfo.setAppkey(appName); heartbeatInfo.setSendTime(response.getCreateMillisTime()); ProviderSystemInfoCollector providerSystemInfoCollector = ProviderSystemInfoCollector.INSTANCE; heartbeatInfo.setStatus(providerSystemInfoCollector.getStatus(response.getPort())); diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java index 87a4a96d..2c039604 100644 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java @@ -43,245 +43,257 @@ public class ContextPrepareInvokeFilter extends InvocationInvokeFilter { - private static final Logger logger = LoggerLoader.getLogger(ContextPrepareInvokeFilter.class); - private ConcurrentHashMap protoVersionMap = new ConcurrentHashMap(); - private ConcurrentHashMap compactVersionMap = new ConcurrentHashMap(); - private static AtomicLong requestSequenceMaker = new AtomicLong(); - private static final String KEY_COMPACT = "pigeon.invoker.request.compact"; - private static final String KEY_TIMEOUT_RESET = "pigeon.timeout.reset"; - private static final InvokerContextProcessor contextProcessor = ExtensionLoader - .getExtension(InvokerContextProcessor.class); - private static final ConfigManager configManager = ConfigManagerLoader.getConfigManager(); - private static volatile boolean isCompact = configManager.getBooleanValue(KEY_COMPACT, true); - private static volatile boolean isTimeoutReset = configManager.getBooleanValue(KEY_TIMEOUT_RESET, true); - - public ContextPrepareInvokeFilter() { - ConfigManagerLoader.getConfigManager().registerConfigChangeListener(new InnerConfigChangeListener()); - } - - private static class InnerConfigChangeListener implements ConfigChangeListener { - - @Override - public void onKeyUpdated(String key, String value) { - if (key.endsWith(KEY_COMPACT)) { - try { - isCompact = Boolean.valueOf(value); - } catch (RuntimeException e) { - logger.warn("invalid value for key " + key, e); - } - } else if (key.endsWith(KEY_TIMEOUT_RESET)) { - try { - isTimeoutReset = Boolean.valueOf(value); - } catch (RuntimeException e) { - logger.warn("invalid value for key " + key, e); - } - } - } - - @Override - public void onKeyAdded(String key, String value) { - // TODO Auto-generated method stub - - } - - @Override - public void onKeyRemoved(String key) { - // TODO Auto-generated method stub - - } - - } - - @Override - public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) - throws Throwable { - invocationContext.getTimeline().add(new TimePoint(TimePhase.C)); - - readMonitorContext(invocationContext); - - initRequest(invocationContext); - transferContextValueToRequest(invocationContext, invocationContext.getRequest()); - try { - return handler.handle(invocationContext); - } finally { - ContextUtils.clearRequestContext(); - } - } - - private void readMonitorContext(InvokerContext invocationContext) { - MonitorTransaction transaction = MonitorLoader.getMonitor().getCurrentCallTransaction(); - - if (transaction != null) { - Client client = invocationContext.getClient(); - String targetApp = RegistryManager.getInstance().getReferencedAppFromCache(client.getAddress()); - - transaction.readMonitorContext(targetApp); - } - } - - // 初始化Request的createTime和timeout,以便统一这两个值 - private void initRequest(InvokerContext invokerContext) { - InvocationRequest request = invokerContext.getRequest(); - - if (!(request instanceof UnifiedRequest)) { - compactRequest(invokerContext); - } else { - UnifiedRequest _request = (UnifiedRequest) request; - _request.setServiceInterface(invokerContext.getInvokerConfig().getServiceInterface()); - _request.setParameterTypes(invokerContext.getParameterTypes()); - } - - checkSerialize(invokerContext); - request = invokerContext.getRequest(); - request.setSequence(requestSequenceMaker.incrementAndGet() * -1); - request.setCreateMillisTime(System.currentTimeMillis()); - request.setMessageType(Constants.MESSAGE_TYPE_SERVICE); - - InvokerConfig invokerConfig = invokerContext.getInvokerConfig(); - if (invokerConfig != null) { - request.setTimeout(invokerConfig.getTimeout(invokerContext.getMethodName())); - - if (isTimeoutReset) { - Object timeout = ContextUtils.getLocalContext(Constants.REQUEST_TIMEOUT); - if (timeout != null) { - int timeout_ = Integer.parseInt(String.valueOf(timeout)); - if (timeout_ > 0 && timeout_ < request.getTimeout()) { - request.setTimeout(timeout_); - } - } - } - if (CallMethod.isOneway(invokerConfig.getCallType())) { - request.setCallType(CallType.NOREPLY.getCode()); - } else { - request.setCallType(CallType.REPLY.getCode()); - } - } - } - - private void checkSerialize(InvokerContext invokerContext) { - InvocationRequest request = invokerContext.getRequest(); - - if (SerializerType.isProto(request.getSerialize()) || SerializerType.isFst(request.getSerialize())) { - checkVersion(invokerContext); - } else if (SerializerType.isThrift(request.getSerialize())) { - checkProtocol(invokerContext); - } - - } - - // 缺服务是否支持判断 - private void checkVersion(InvokerContext invokerContext) { - Client client = invokerContext.getClient(); - InvocationRequest request = invokerContext.getRequest(); - - String version = RegistryManager.getInstance().getReferencedVersionFromCache(client.getAddress()); - - boolean supported = true; - if (StringUtils.isBlank(version)) { - supported = false; - } else if (protoVersionMap.containsKey(version)) { - supported = protoVersionMap.get(version); - } else { - supported = VersionUtils.isProtoFstSupported(version); - protoVersionMap.putIfAbsent(version, supported); - } - - if (!supported) { - request.setSerialize(SerializerType.HESSIAN.getCode()); - invokerContext.getInvokerConfig().setSerialize(SerializerType.HESSIAN.getName()); - } - } - - private void checkProtocol(InvokerContext invokerContext) { - Client client = invokerContext.getClient(); - InvocationRequest request = invokerContext.getRequest(); - boolean supported = false; - try { - supported = RegistryManager.getInstance().isSupportNewProtocol(client.getAddress(), - request.getServiceName()); - } catch (RegistryException e) { - supported = false; - } - if (!supported) { - InvocationRequest _request = InvocationUtils.newRequest(invokerContext); - _request.setSerialize(SerializerType.HESSIAN.getCode()); - invokerContext.setRequest(_request); - } - } - - private void compactRequest(InvokerContext invokerContext) { - boolean isCompactReq = false; - if (isCompact) { - Client client = invokerContext.getClient(); - String version = RegistryManager.getInstance().getReferencedVersionFromCache(client.getAddress()); - if (StringUtils.isBlank(version)) { - isCompactReq = false; - } else if (compactVersionMap.containsKey(version)) { - isCompactReq = compactVersionMap.get(version); - } else { - isCompactReq = VersionUtils.isCompactSupported(version); - compactVersionMap.putIfAbsent(version, isCompactReq); - } - } - if (isCompactReq) { - invokerContext.setRequest(new CompactRequest(invokerContext)); - } - } - - private void transferContextValueToRequest(final InvokerContext invocationContext, - final InvocationRequest request) { - if (request instanceof UnifiedRequest) { - UnifiedRequest _request = (UnifiedRequest) request; - _request.setParameterTypes(invocationContext.getParameterTypes()); - transferContextValueToRequest0(_request); - } else { - transferContextValueToRequest0(invocationContext, request); - } - } - - private void transferContextValueToRequest0(final InvokerContext invocationContext, - final InvocationRequest request) { - if (contextProcessor != null) { - contextProcessor.preInvoke(invocationContext); - } - - if (ContextUtils.getGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP) == null) { - ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP, - ConfigManagerLoader.getConfigManager().getAppName()); - ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_IP, - ConfigManagerLoader.getConfigManager().getLocalIp()); - } - request.setGlobalValues(ContextUtils.getGlobalContext()); - ContextUtils.initRequestContext(); - request.setRequestValues(ContextUtils.getRequestContext()); - } - - private void transferContextValueToRequest0(final UnifiedRequest request) { - - if (ContextUtils.getGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP) == null) { - ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP, - ConfigManagerLoader.getConfigManager().getAppName()); - ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_IP, - ConfigManagerLoader.getConfigManager().getLocalIp()); - } - - Map _globalContext = request.getGlobalContext(); - if (_globalContext == null) { - _globalContext = new HashMap(); - request.setGlobalContext(_globalContext); - } - - Map globalContext = ContextUtils.getGlobalContext(); - - ContextUtils.convertContext(globalContext, _globalContext); - Map _localContext = request.getLocalContext(); - if (_localContext == null) { - _localContext = new HashMap(); - request.setLocalContext(_localContext); - } - - Map localContext = ContextUtils.getRequestContext(); - ContextUtils.convertContext(localContext, _localContext); - } + private static final Logger logger = LoggerLoader.getLogger(ContextPrepareInvokeFilter.class); + private ConcurrentHashMap protoVersionMap = new ConcurrentHashMap(); + private ConcurrentHashMap compactVersionMap = new ConcurrentHashMap(); + private static AtomicLong requestSequenceMaker = new AtomicLong(); + private static final String KEY_COMPACT = "pigeon.invoker.request.compact"; + private static final String KEY_TIMEOUT_RESET = "pigeon.timeout.reset"; + private static final String KEY_CONTEXT_ENABLE = "pigeon.invoker.context.enable"; + private static final InvokerContextProcessor contextProcessor = ExtensionLoader + .getExtension(InvokerContextProcessor.class); + private static final ConfigManager configManager = ConfigManagerLoader.getConfigManager(); + private static volatile boolean isCompact = configManager.getBooleanValue(KEY_COMPACT, true); + private static volatile boolean isTimeoutReset = configManager.getBooleanValue(KEY_TIMEOUT_RESET, true); + private static volatile boolean contextEnable = configManager.getBooleanValue(KEY_CONTEXT_ENABLE, true); + + public ContextPrepareInvokeFilter() { + ConfigManagerLoader.getConfigManager().registerConfigChangeListener(new InnerConfigChangeListener()); + } + + private static class InnerConfigChangeListener implements ConfigChangeListener { + + @Override + public void onKeyUpdated(String key, String value) { + if (key.endsWith(KEY_COMPACT)) { + try { + isCompact = Boolean.valueOf(value); + } catch (RuntimeException e) { + logger.warn("invalid value for key " + key, e); + } + } else if (key.endsWith(KEY_TIMEOUT_RESET)) { + try { + isTimeoutReset = Boolean.valueOf(value); + } catch (RuntimeException e) { + logger.warn("invalid value for key " + key, e); + } + } else if (key.endsWith(KEY_CONTEXT_ENABLE)) { + try { + contextEnable = Boolean.valueOf(value); + } catch (RuntimeException e) { + logger.warn("invalid value for key " + key, e); + } + } + } + + @Override + public void onKeyAdded(String key, String value) { + // TODO Auto-generated method stub + + } + + @Override + public void onKeyRemoved(String key) { + // TODO Auto-generated method stub + + } + + } + + @Override + public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) + throws Throwable { + invocationContext.getTimeline().add(new TimePoint(TimePhase.C)); + + readMonitorContext(invocationContext); + + initRequest(invocationContext); + + if(contextEnable) { + transferContextValueToRequest(invocationContext, invocationContext.getRequest()); + } + + try { + return handler.handle(invocationContext); + } finally { + ContextUtils.clearRequestContext(); + } + } + + private void readMonitorContext(InvokerContext invocationContext) { + MonitorTransaction transaction = MonitorLoader.getMonitor().getCurrentCallTransaction(); + + if (transaction != null) { + Client client = invocationContext.getClient(); + String targetApp = RegistryManager.getInstance().getReferencedAppFromCache(client.getAddress()); + + transaction.readMonitorContext(targetApp); + } + } + + // 初始化Request的createTime和timeout,以便统一这两个值 + private void initRequest(InvokerContext invokerContext) { + InvocationRequest request = invokerContext.getRequest(); + + if (!(request instanceof UnifiedRequest)) { + compactRequest(invokerContext); + } else { + UnifiedRequest _request = (UnifiedRequest) request; + _request.setServiceInterface(invokerContext.getInvokerConfig().getServiceInterface()); + _request.setParameterTypes(invokerContext.getParameterTypes()); + } + + checkSerialize(invokerContext); + request = invokerContext.getRequest(); + request.setSequence(requestSequenceMaker.incrementAndGet() * -1); + request.setCreateMillisTime(System.currentTimeMillis()); + request.setMessageType(Constants.MESSAGE_TYPE_SERVICE); + + InvokerConfig invokerConfig = invokerContext.getInvokerConfig(); + if (invokerConfig != null) { + request.setTimeout(invokerConfig.getTimeout(invokerContext.getMethodName())); + + if (isTimeoutReset) { + Object timeout = ContextUtils.getLocalContext(Constants.REQUEST_TIMEOUT); + if (timeout != null) { + int timeout_ = Integer.parseInt(String.valueOf(timeout)); + if (timeout_ > 0 && timeout_ < request.getTimeout()) { + request.setTimeout(timeout_); + } + } + } + if (CallMethod.isOneway(invokerConfig.getCallType())) { + request.setCallType(CallType.NOREPLY.getCode()); + } else { + request.setCallType(CallType.REPLY.getCode()); + } + } + } + + private void checkSerialize(InvokerContext invokerContext) { + InvocationRequest request = invokerContext.getRequest(); + + if (SerializerType.isProto(request.getSerialize()) || SerializerType.isFst(request.getSerialize())) { + checkVersion(invokerContext); + } else if (SerializerType.isThrift(request.getSerialize())) { + checkProtocol(invokerContext); + } + + } + + // 缺服务是否支持判断 + private void checkVersion(InvokerContext invokerContext) { + Client client = invokerContext.getClient(); + InvocationRequest request = invokerContext.getRequest(); + + String version = RegistryManager.getInstance().getReferencedVersionFromCache(client.getAddress()); + + boolean supported = true; + if (StringUtils.isBlank(version)) { + supported = false; + } else if (protoVersionMap.containsKey(version)) { + supported = protoVersionMap.get(version); + } else { + supported = VersionUtils.isProtoFstSupported(version); + protoVersionMap.putIfAbsent(version, supported); + } + + if (!supported) { + request.setSerialize(SerializerType.HESSIAN.getCode()); + invokerContext.getInvokerConfig().setSerialize(SerializerType.HESSIAN.getName()); + } + } + + private void checkProtocol(InvokerContext invokerContext) { + Client client = invokerContext.getClient(); + InvocationRequest request = invokerContext.getRequest(); + boolean supported = false; + try { + supported = RegistryManager.getInstance().isSupportNewProtocol(client.getAddress(), + request.getServiceName()); + } catch (RegistryException e) { + supported = false; + } + if (!supported) { + InvocationRequest _request = InvocationUtils.newRequest(invokerContext); + _request.setSerialize(SerializerType.HESSIAN.getCode()); + invokerContext.setRequest(_request); + } + } + + private void compactRequest(InvokerContext invokerContext) { + boolean isCompactReq = false; + if (isCompact) { + Client client = invokerContext.getClient(); + String version = RegistryManager.getInstance().getReferencedVersionFromCache(client.getAddress()); + if (StringUtils.isBlank(version)) { + isCompactReq = false; + } else if (compactVersionMap.containsKey(version)) { + isCompactReq = compactVersionMap.get(version); + } else { + isCompactReq = VersionUtils.isCompactSupported(version); + compactVersionMap.putIfAbsent(version, isCompactReq); + } + } + if (isCompactReq) { + invokerContext.setRequest(new CompactRequest(invokerContext)); + } + } + + private void transferContextValueToRequest(final InvokerContext invocationContext, + final InvocationRequest request) { + if (request instanceof UnifiedRequest) { + UnifiedRequest _request = (UnifiedRequest) request; + _request.setParameterTypes(invocationContext.getParameterTypes()); + transferContextValueToRequest0(_request); + } else { + transferContextValueToRequest0(invocationContext, request); + } + } + + private void transferContextValueToRequest0(final InvokerContext invocationContext, + final InvocationRequest request) { + if (contextProcessor != null) { + contextProcessor.preInvoke(invocationContext); + } + + if (ContextUtils.getGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP) == null) { + ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP, + ConfigManagerLoader.getConfigManager().getAppName()); + ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_IP, + ConfigManagerLoader.getConfigManager().getLocalIp()); + } + request.setGlobalValues(ContextUtils.getGlobalContext()); + ContextUtils.initRequestContext(); + request.setRequestValues(ContextUtils.getRequestContext()); + } + + private void transferContextValueToRequest0(final UnifiedRequest request) { + + if (ContextUtils.getGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP) == null) { + ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_APP, + ConfigManagerLoader.getConfigManager().getAppName()); + ContextUtils.putGlobalContext(Constants.CONTEXT_KEY_SOURCE_IP, + ConfigManagerLoader.getConfigManager().getLocalIp()); + } + + Map _globalContext = request.getGlobalContext(); + if (_globalContext == null) { + _globalContext = new HashMap(); + request.setGlobalContext(_globalContext); + } + + Map globalContext = ContextUtils.getGlobalContext(); + + ContextUtils.convertContext(globalContext, _globalContext); + Map _localContext = request.getLocalContext(); + if (_localContext == null) { + _localContext = new HashMap(); + request.setLocalContext(_localContext); + } + + Map localContext = ContextUtils.getRequestContext(); + ContextUtils.convertContext(localContext, _localContext); + } } diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java index 18e7b3ae..2ad4513f 100755 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java @@ -50,6 +50,8 @@ public class RequestThreadPoolProcessor extends AbstractRequestProcessor { private static volatile boolean isTrace = true; + private volatile boolean poolConfigSwitchable = false; + private static DynamicThreadPool sharedRequestProcessThreadPool = null; private static final int SLOW_POOL_CORESIZE = configManager.getIntValue( @@ -113,6 +115,7 @@ public class RequestThreadPoolProcessor extends AbstractRequestProcessor { public RequestThreadPoolProcessor() { isTrace = configManager.getBooleanValue(Constants.KEY_PROVIDER_TRACE_ENABLE, Constants.DEFAULT_PROVIDER_TRACE_ENABLE); + poolConfigSwitchable = configManager.getBooleanValue(KEY_PROVIDER_POOL_CONFIG_ENABLE, false); configManager.registerConfigChangeListener(new InnerConfigChangeListener()); } @@ -179,7 +182,7 @@ public void doStop() { public Future doProcessRequest(final InvocationRequest request, final ProviderContext providerContext) { - requestContextMap.put(request, providerContext); +// requestContextMap.put(request, providerContext); doMonitorData(request, providerContext); @@ -198,7 +201,7 @@ public InvocationResponse call() throws Exception { } catch (Throwable t) { logger.error("Process request failed with invocation handler, you should never be here.", t); } finally { - requestContextMap.remove(request); +// requestContextMap.remove(request); } return null; } @@ -210,7 +213,7 @@ public InvocationResponse call() throws Exception { providerContext.getTimeline().add(new TimePoint(TimePhase.T)); return pool.submit(requestExecutor); } catch (RejectedExecutionException e) { - requestContextMap.remove(request); +// requestContextMap.remove(request); throw new RejectedException(getProcessorStatistics(pool), e); } @@ -252,8 +255,7 @@ private ThreadPool selectThreadPool(final InvocationRequest request) { } // lion poolConfig - if (pool == null && configManager.getBooleanValue(KEY_PROVIDER_POOL_CONFIG_ENABLE, false) - && !CollectionUtils.isEmpty(apiPoolNameMapping)) { + if (pool == null && poolConfigSwitchable && !CollectionUtils.isEmpty(apiPoolNameMapping)) { PoolConfig poolConfig = null; String poolName = apiPoolNameMapping.get(methodKey); if (StringUtils.isNotBlank(poolName)) { // 方法级别 @@ -635,6 +637,8 @@ public void onKeyUpdated(String key, String value) { isTrace = Boolean.valueOf(value); } catch (RuntimeException e) { } + } else if (key.endsWith(KEY_PROVIDER_POOL_CONFIG_ENABLE)) { + poolConfigSwitchable = Boolean.valueOf(value); } else { for (String k : methodPoolConfigKeys.keySet()) { String v = methodPoolConfigKeys.get(k); From 009cf593e6d0a9fcbf8c50c4f3212728e6fc713f Mon Sep 17 00:00:00 2001 From: xiangwu Date: Fri, 3 Feb 2017 16:36:06 +0800 Subject: [PATCH 3/9] fix bug with http invoker --- USER_GUIDE.md | 4 +- pigeon-build/dependency-reduced-pom.xml | 12 - .../http/invoker/HttpInvokerClient.java | 256 ++++++------- .../invoker/concurrent/ServiceFutureImpl.java | 340 +++++++++--------- .../filter/ContextPrepareInvokeFilter.java | 2 +- 5 files changed, 305 insertions(+), 309 deletions(-) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 1fe85bd2..c7e0b82e 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -163,13 +163,13 @@ pigeon.provider.applimit.enable=true 如果使用了点评内部的lion配置中心,相比本地配置管理上更加方便,在lion管理端进行配置的统一管理,无需在每台机器上的properties文件里进行配置 lion配置需要按前面依赖里提到的引入以下依赖: - +```xml com.dianping pigeon-config-lion 1.0.0-SNAPSHOT - +``` 如果要设置某个应用级的pigeon配置,需要在lion里增加相应的pigeon配置,如pigeon内部有一个全局默认配置pigeon.provider.applimit.enable,值为false 如果某个应用xxx-service(这个应用名就是app.properties里的app.name)想修改这个默认配置,那么可以在lion里增加一个key:xxx-service.pigeon.provider.applimit.enable,设置为true diff --git a/pigeon-build/dependency-reduced-pom.xml b/pigeon-build/dependency-reduced-pom.xml index f73a52b7..7352a3f8 100644 --- a/pigeon-build/dependency-reduced-pom.xml +++ b/pigeon-build/dependency-reduced-pom.xml @@ -310,18 +310,6 @@ 1.7.12 compile - - org.msgpack - msgpack-core - 0.8.7 - compile - - - org.msgpack - jackson-dataformat-msgpack - 0.8.7 - compile - org.apache.curator curator-framework diff --git a/pigeon-extensions/pigeon-remoting-http/src/main/java/com/dianping/pigeon/remoting/http/invoker/HttpInvokerClient.java b/pigeon-extensions/pigeon-remoting-http/src/main/java/com/dianping/pigeon/remoting/http/invoker/HttpInvokerClient.java index 318a8253..99cbaca0 100644 --- a/pigeon-extensions/pigeon-remoting-http/src/main/java/com/dianping/pigeon/remoting/http/invoker/HttpInvokerClient.java +++ b/pigeon-extensions/pigeon-remoting-http/src/main/java/com/dianping/pigeon/remoting/http/invoker/HttpInvokerClient.java @@ -26,133 +26,133 @@ public class HttpInvokerClient extends AbstractClient { - private ConnectInfo connectInfo; - private HttpInvokerExecutor httpInvokerExecutor; - private String serviceUrlPrefix = null; - private String defaultServiceUrl = null; - private boolean isConnected = false; - public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object"; - - public HttpInvokerClient(ClientConfig clientConfig, - ConnectInfo connectInfo, - ResponseProcessor responseProcessor) { - super(clientConfig, responseProcessor); - - this.connectInfo = connectInfo; - if (logger.isInfoEnabled()) { - logger.info("http client:" + connectInfo); - } - serviceUrlPrefix = "http://" + connectInfo.getHost() + ":" + connectInfo.getPort() + "/"; - defaultServiceUrl = serviceUrlPrefix + "service"; - httpInvokerExecutor = new HttpInvokerExecutor(); - HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); - HttpConnectionManagerParams params = new HttpConnectionManagerParams(); - params.setMaxTotalConnections(300); - params.setDefaultMaxConnectionsPerHost(50); - params.setConnectionTimeout(1000); - params.setTcpNoDelay(true); - params.setSoTimeout(3000); - params.setStaleCheckingEnabled(true); - connectionManager.setParams(params); - HttpClient httpClient = new HttpClient(); - httpClient.setHttpConnectionManager(connectionManager); - httpInvokerExecutor.setHttpClient(httpClient); - } - - @Override - public ConnectInfo getConnectInfo() { - return connectInfo; - } - - @Override - public void doOpen() { - InvocationRequest request = InvocationUtils.newRequest(Constants.HEART_TASK_SERVICE, - Constants.HEART_TASK_METHOD, null, SerializerType.HESSIAN.getCode(), - Constants.MESSAGE_TYPE_HEART, 5000, null); - request.setSequence(0); - request.setCreateMillisTime(System.currentTimeMillis()); - request.setCallType(Constants.CALLTYPE_REPLY); - InvocationResponse response = null; - try { - response = this.write(request); - if (response != null && response.getSequence() == 0) { - isConnected = true; - } - } catch (Throwable e) { - close(); - isConnected = false; - } - } - - @Override - public InvocationResponse doWrite(InvocationRequest invocationRequest) throws NetworkException { - return write(defaultServiceUrl, invocationRequest); - } - - public InvocationResponse write(String url, InvocationRequest request) throws NetworkException { - final int timeout = request.getTimeout(); - httpInvokerExecutor.setReadTimeout(timeout); - try { - InvocationResponse invocationResponse = httpInvokerExecutor.executeRequest(url, request); - this.isConnected = true; - return invocationResponse; - } catch (ConnectException e) { - this.isConnected = false; - throw new NetworkException("remote call failed:" + request, e); - } catch (Throwable e) { - throw new NetworkException("remote call failed:" + request, e); - } - } - - @Override - public String getHost() { - return connectInfo.getHost(); - } - - @Override - public String getAddress() { - return connectInfo.getHost() + ":" + connectInfo.getPort(); - } - - @Override - public int getPort() { - return connectInfo.getPort(); - } - - @Override - public void doClose() { - } - - @Override - public List getChannels() { - return null; - } - - - @Override - public String toString() { - return this.getAddress(); - } - - - public boolean equals(Object obj) { - if (obj instanceof HttpInvokerClient) { - HttpInvokerClient nc = (HttpInvokerClient) obj; - return this.getAddress().equals(nc.getAddress()); - } else { - return super.equals(obj); - } - } - - @Override - public int hashCode() { - return getAddress().hashCode(); - } - - @Override - public String getProtocol() { - return Constants.PROTOCOL_HTTP; - } - + private ConnectInfo connectInfo; + private HttpInvokerExecutor httpInvokerExecutor; + private String serviceUrlPrefix = null; + private String defaultServiceUrl = null; + private boolean isConnected = false; + public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object"; + + public HttpInvokerClient(ClientConfig clientConfig, ConnectInfo connectInfo, ResponseProcessor responseProcessor) { + super(clientConfig, responseProcessor); + + this.connectInfo = connectInfo; + if (logger.isInfoEnabled()) { + logger.info("http client:" + connectInfo); + } + serviceUrlPrefix = "http://" + connectInfo.getHost() + ":" + connectInfo.getPort() + "/"; + defaultServiceUrl = serviceUrlPrefix + "service"; + httpInvokerExecutor = new HttpInvokerExecutor(); + HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); + HttpConnectionManagerParams params = new HttpConnectionManagerParams(); + params.setMaxTotalConnections(300); + params.setDefaultMaxConnectionsPerHost(50); + params.setConnectionTimeout(1000); + params.setTcpNoDelay(true); + params.setSoTimeout(3000); + params.setStaleCheckingEnabled(true); + connectionManager.setParams(params); + HttpClient httpClient = new HttpClient(); + httpClient.setHttpConnectionManager(connectionManager); + httpInvokerExecutor.setHttpClient(httpClient); + } + + @Override + public ConnectInfo getConnectInfo() { + return connectInfo; + } + + @Override + public void doOpen() { + InvocationRequest request = InvocationUtils.newRequest(Constants.HEART_TASK_SERVICE, + Constants.HEART_TASK_METHOD, null, SerializerType.HESSIAN.getCode(), Constants.MESSAGE_TYPE_HEART, 5000, + null); + request.setSequence(0); + request.setCreateMillisTime(System.currentTimeMillis()); + request.setCallType(Constants.CALLTYPE_REPLY); + InvocationResponse response = null; + try { + response = this.write(request); + if (response != null && response.getSequence() == 0) { + isConnected = true; + } + } catch (Throwable e) { + close(); + isConnected = false; + } + } + + @Override + public InvocationResponse doWrite(InvocationRequest invocationRequest) throws NetworkException { + return write(defaultServiceUrl, invocationRequest); + } + + public InvocationResponse write(String url, InvocationRequest request) throws NetworkException { + final int timeout = request.getTimeout(); + httpInvokerExecutor.setReadTimeout(timeout); + try { + InvocationResponse invocationResponse = httpInvokerExecutor.executeRequest(url, request); + this.isConnected = true; + return invocationResponse; + } catch (ConnectException e) { + this.isConnected = false; + throw new NetworkException("remote call failed:" + url + ", request:" + request, e); + } catch (Throwable e) { + throw new NetworkException("remote call failed:" + url + ", request:" + request, e); + } + } + + @Override + public String getHost() { + return connectInfo.getHost(); + } + + @Override + public String getAddress() { + return connectInfo.getHost() + ":" + connectInfo.getPort(); + } + + @Override + public int getPort() { + return connectInfo.getPort(); + } + + @Override + public boolean isActive() { + return super.isActive() && isConnected; + } + + @Override + public void doClose() { + } + + @Override + public List getChannels() { + return null; + } + + @Override + public String toString() { + return this.getAddress(); + } + + public boolean equals(Object obj) { + if (obj instanceof HttpInvokerClient) { + HttpInvokerClient nc = (HttpInvokerClient) obj; + return this.getAddress().equals(nc.getAddress()); + } else { + return super.equals(obj); + } + } + + @Override + public int hashCode() { + return getAddress().hashCode(); + } + + @Override + public String getProtocol() { + return Constants.PROTOCOL_HTTP; + } } diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/concurrent/ServiceFutureImpl.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/concurrent/ServiceFutureImpl.java index c7582ddb..c4cc51d3 100755 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/concurrent/ServiceFutureImpl.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/concurrent/ServiceFutureImpl.java @@ -11,6 +11,7 @@ import com.dianping.pigeon.log.Logger; import com.dianping.pigeon.log.LoggerLoader; +import com.dianping.pigeon.registry.RegistryManager; import com.dianping.pigeon.remoting.common.monitor.trace.InvokerMonitorData; import com.dianping.pigeon.remoting.common.domain.InvocationContext.TimePhase; import com.dianping.pigeon.remoting.common.domain.InvocationContext.TimePoint; @@ -21,6 +22,7 @@ import com.dianping.pigeon.remoting.common.monitor.SizeMonitor; import com.dianping.pigeon.remoting.common.util.Constants; import com.dianping.pigeon.remoting.common.util.InvocationUtils; +import com.dianping.pigeon.remoting.invoker.Client; import com.dianping.pigeon.remoting.invoker.domain.InvokerContext; import com.dianping.pigeon.remoting.invoker.exception.RequestTimeoutException; import com.dianping.pigeon.remoting.invoker.process.DegradationManager; @@ -29,170 +31,176 @@ public class ServiceFutureImpl extends CallbackFuture implements Future { - private static final Logger logger = LoggerLoader.getLogger(ServiceFutureImpl.class); - - private long timeout = Long.MAX_VALUE; - - protected Thread callerThread; - - protected InvokerContext invocationContext; - - public ServiceFutureImpl(InvokerContext invocationContext, long timeout) { - super(); - this.invocationContext = invocationContext; - this.timeout = timeout; - callerThread = Thread.currentThread(); - } - - @Override - public Object get() throws InterruptedException, ExecutionException { - return get(this.timeout); - } - - public Object get(long timeoutMillis) throws InterruptedException, ExecutionException { - InvocationResponse response = null; - String addr = null; - if (client != null) { - addr = client.getAddress(); - } - String callInterface = InvocationUtils.getRemoteCallFullName(invocationContext.getInvokerConfig().getUrl(), - invocationContext.getMethodName(), invocationContext.getParameterTypes()); - transaction = monitor.createTransaction("PigeonFuture", callInterface, invocationContext); - if (transaction != null) { - transaction.setStatusOk(); - transaction.logEvent("PigeonCall.callType", invocationContext.getInvokerConfig().getCallType(), ""); - transaction.logEvent("PigeonCall.serialize", "" - + (request == null ? invocationContext.getInvokerConfig().getSerialize() : request.getSerialize()), - ""); - transaction.logEvent("PigeonCall.timeout", timeoutMillis + "", - invocationContext.getInvokerConfig().getTimeout() + ""); - invocationContext.getTimeline().add(new TimePoint(TimePhase.F, System.currentTimeMillis())); - } - boolean isSuccess = false; - try { - try { - response = super.waitResponse(timeoutMillis); - if (transaction != null && response != null) { - String size = SizeMonitor.getInstance().getLogSize(response.getSize()); - if (size != null) { - transaction.logEvent("PigeonCall.responseSize", size, "" + response.getSize()); - } - invocationContext.getTimeline().add(new TimePoint(TimePhase.R, response.getCreateMillisTime())); - invocationContext.getTimeline().add(new TimePoint(TimePhase.F, System.currentTimeMillis())); - } - } catch (RuntimeException e) { - // failure degrade condition - InvocationResponse degradedResponse = null; - if (DegradationManager.INSTANCE.needFailureDegrade(invocationContext)) { - try { - degradedResponse = DegradationFilter.degradeCall(invocationContext, true); - } catch (Throwable t) { - // won't happen - logger.warn("failure degrade in future call type error: " + t.toString()); - } - } - if (degradedResponse != null) {// 返回同步调用模式的失败降级结果 - return degradedResponse.getReturn(); - } - // not failure degrade - DegradationManager.INSTANCE.addFailedRequest(invocationContext, e); - ExceptionManager.INSTANCE.logRpcException(addr, invocationContext.getInvokerConfig().getUrl(), - invocationContext.getMethodName(), "error with future call", e, request, response, transaction); - throw e; - } - - setResponseContext(response); - - if (response.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) { - isSuccess = true; - return response.getReturn(); - } else if (response.getMessageType() == Constants.MESSAGE_TYPE_EXCEPTION) { - // failure degrade condition - InvocationResponse degradedResponse = null; - if (DegradationManager.INSTANCE.needFailureDegrade(invocationContext)) { - try { - degradedResponse = DegradationFilter.degradeCall(invocationContext, true); - } catch (Throwable t) { - // won't happen - logger.warn("failure degrade in future call type error: " + t.toString()); - } - } - if (degradedResponse != null) {// 返回同步调用模式的失败降级结果 - return degradedResponse.getReturn(); - } - // not failure degrade - RpcException e = ExceptionManager.INSTANCE.logRemoteCallException(addr, - invocationContext.getInvokerConfig().getUrl(), invocationContext.getMethodName(), - "remote call error with future call", request, response, transaction); - if (e != null) { - DegradationManager.INSTANCE.addFailedRequest(invocationContext, e); - throw e; - } - } else if (response.getMessageType() == Constants.MESSAGE_TYPE_SERVICE_EXCEPTION) { - Throwable e = ExceptionManager.INSTANCE - .logRemoteServiceException("remote service biz error with future call", request, response); - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else if (e != null) { - throw new ApplicationException(e); - } - } - RpcException e = new BadResponseException(response.toString()); - throw e; - } finally { - if (transaction != null) { - if (invocationContext.isDegraded()) { - transaction.logEvent("PigeonCall.degrade", callInterface, ""); - } - invocationContext.getTimeline().add(new TimePoint(TimePhase.E, System.currentTimeMillis())); - try { - transaction.complete(); - } catch (RuntimeException e) { - monitor.logMonitorError(e); - } - } - - InvokerMonitorData monitorData = (InvokerMonitorData) invocationContext.getMonitorData(); - if (monitorData != null) { - monitorData.setIsSuccess(isSuccess); - monitorData.complete(); - } - } - } - - @Override - public Object get(long timeout, TimeUnit unit) throws java.lang.InterruptedException, - java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException { - long timeoutMs = unit.toMillis(timeout); - try { - return get(timeoutMs); - } catch (RequestTimeoutException e) { - throw new TimeoutException(timeoutMs + "ms timeout:" + e.getMessage()); - } catch (InterruptedException e) { - throw e; - } - } - - protected void processContext() { - Thread currentThread = Thread.currentThread(); - if (currentThread == callerThread) { - super.processContext(); - } - } - - @Override - public void dispose() { - super.dispose(); - if (transaction != null) { - try { - transaction.complete(); - } catch (RuntimeException e) { - } - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return cancel(); - } + private static final Logger logger = LoggerLoader.getLogger(ServiceFutureImpl.class); + + private long timeout = Long.MAX_VALUE; + + protected Thread callerThread; + + protected InvokerContext invocationContext; + + public ServiceFutureImpl(InvokerContext invocationContext, long timeout) { + super(); + this.invocationContext = invocationContext; + this.timeout = timeout; + callerThread = Thread.currentThread(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + return get(this.timeout); + } + + public Object get(long timeoutMillis) throws InterruptedException, ExecutionException { + InvocationResponse response = null; + String addr = null; + if (client != null) { + addr = client.getAddress(); + } + String callInterface = InvocationUtils.getRemoteCallFullName(invocationContext.getInvokerConfig().getUrl(), + invocationContext.getMethodName(), invocationContext.getParameterTypes()); + transaction = monitor.createTransaction("PigeonFuture", callInterface, invocationContext); + if (transaction != null) { + transaction.setStatusOk(); + transaction.logEvent("PigeonCall.callType", invocationContext.getInvokerConfig().getCallType(), ""); + transaction.logEvent("PigeonCall.serialize", "" + + (request == null ? invocationContext.getInvokerConfig().getSerialize() : request.getSerialize()), + ""); + transaction.logEvent("PigeonCall.timeout", timeoutMillis + "", + invocationContext.getInvokerConfig().getTimeout() + ""); + Client client = invocationContext.getClient(); + if (client != null) { + String targetApp = RegistryManager.getInstance().getReferencedAppFromCache(client.getAddress()); + transaction.logEvent("PigeonCall.app", targetApp, ""); + transaction.logEvent("PigeonCall.server", client.getAddress(), ""); + } + invocationContext.getTimeline().add(new TimePoint(TimePhase.F, System.currentTimeMillis())); + } + boolean isSuccess = false; + try { + try { + response = super.waitResponse(timeoutMillis); + if (transaction != null && response != null) { + String size = SizeMonitor.getInstance().getLogSize(response.getSize()); + if (size != null) { + transaction.logEvent("PigeonCall.responseSize", size, "" + response.getSize()); + } + invocationContext.getTimeline().add(new TimePoint(TimePhase.R, response.getCreateMillisTime())); + invocationContext.getTimeline().add(new TimePoint(TimePhase.F, System.currentTimeMillis())); + } + } catch (RuntimeException e) { + // failure degrade condition + InvocationResponse degradedResponse = null; + if (DegradationManager.INSTANCE.needFailureDegrade(invocationContext)) { + try { + degradedResponse = DegradationFilter.degradeCall(invocationContext, true); + } catch (Throwable t) { + // won't happen + logger.warn("failure degrade in future call type error: " + t.toString()); + } + } + if (degradedResponse != null) {// 返回同步调用模式的失败降级结果 + return degradedResponse.getReturn(); + } + // not failure degrade + DegradationManager.INSTANCE.addFailedRequest(invocationContext, e); + ExceptionManager.INSTANCE.logRpcException(addr, invocationContext.getInvokerConfig().getUrl(), + invocationContext.getMethodName(), "error with future call", e, request, response, transaction); + throw e; + } + + setResponseContext(response); + + if (response.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) { + isSuccess = true; + return response.getReturn(); + } else if (response.getMessageType() == Constants.MESSAGE_TYPE_EXCEPTION) { + // failure degrade condition + InvocationResponse degradedResponse = null; + if (DegradationManager.INSTANCE.needFailureDegrade(invocationContext)) { + try { + degradedResponse = DegradationFilter.degradeCall(invocationContext, true); + } catch (Throwable t) { + // won't happen + logger.warn("failure degrade in future call type error: " + t.toString()); + } + } + if (degradedResponse != null) {// 返回同步调用模式的失败降级结果 + return degradedResponse.getReturn(); + } + // not failure degrade + RpcException e = ExceptionManager.INSTANCE.logRemoteCallException(addr, + invocationContext.getInvokerConfig().getUrl(), invocationContext.getMethodName(), + "remote call error with future call", request, response, transaction); + if (e != null) { + DegradationManager.INSTANCE.addFailedRequest(invocationContext, e); + throw e; + } + } else if (response.getMessageType() == Constants.MESSAGE_TYPE_SERVICE_EXCEPTION) { + Throwable e = ExceptionManager.INSTANCE + .logRemoteServiceException("remote service biz error with future call", request, response); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else if (e != null) { + throw new ApplicationException(e); + } + } + RpcException e = new BadResponseException(response.toString()); + throw e; + } finally { + if (transaction != null) { + if (invocationContext.isDegraded()) { + transaction.logEvent("PigeonCall.degrade", callInterface, ""); + } + invocationContext.getTimeline().add(new TimePoint(TimePhase.E, System.currentTimeMillis())); + try { + transaction.complete(); + } catch (RuntimeException e) { + monitor.logMonitorError(e); + } + } + + InvokerMonitorData monitorData = (InvokerMonitorData) invocationContext.getMonitorData(); + if (monitorData != null) { + monitorData.setIsSuccess(isSuccess); + monitorData.complete(); + } + } + } + + @Override + public Object get(long timeout, TimeUnit unit) throws java.lang.InterruptedException, + java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException { + long timeoutMs = unit.toMillis(timeout); + try { + return get(timeoutMs); + } catch (RequestTimeoutException e) { + throw new TimeoutException(timeoutMs + "ms timeout:" + e.getMessage()); + } catch (InterruptedException e) { + throw e; + } + } + + protected void processContext() { + Thread currentThread = Thread.currentThread(); + if (currentThread == callerThread) { + super.processContext(); + } + } + + @Override + public void dispose() { + super.dispose(); + if (transaction != null) { + try { + transaction.complete(); + } catch (RuntimeException e) { + } + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return cancel(); + } } diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java index 87a4a96d..98bc02d8 100644 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/ContextPrepareInvokeFilter.java @@ -211,7 +211,7 @@ private void checkProtocol(InvokerContext invokerContext) { private void compactRequest(InvokerContext invokerContext) { boolean isCompactReq = false; - if (isCompact) { + if (isCompact && InvokerConfig.PROTOCOL_DEFAULT.equals(invokerContext.getInvokerConfig().getProtocol())) { Client client = invokerContext.getClient(); String version = RegistryManager.getInstance().getReferencedVersionFromCache(client.getAddress()); if (StringUtils.isBlank(version)) { From 5d3664909090f8280e18aca60c3496dd55e1b169 Mon Sep 17 00:00:00 2001 From: xiangwu Date: Fri, 3 Feb 2017 17:24:36 +0800 Subject: [PATCH 4/9] add pigeon-demo remark to readme --- USER_GUIDE.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index c7e0b82e..4f5d55c9 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -191,6 +191,8 @@ com.xxx....XXXConfigManager ## 快速入门 +本文档相关代码示例也可以参考[pigeon-demo](https://github.com/dianping/pigeon-demo)项目 + ### 定义服务 定义服务接口: (该接口需单独打包,在服务提供方和调用方共享) From 8ffae623219789bb5a4762a415d8aae829e88a82 Mon Sep 17 00:00:00 2001 From: xiangwu Date: Fri, 3 Feb 2017 18:20:08 +0800 Subject: [PATCH 5/9] add zk status log if unable to connect to zk --- .../com/dianping/pigeon/registry/zookeeper/CuratorClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java b/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java index 23c306db..55a174a9 100644 --- a/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java +++ b/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java @@ -89,11 +89,12 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { CuratorFramework oldClient = this.client; this.client = client; close(oldClient); - logger.info("succeed to create zookeeper client, connected:" + isConnected); if (isConnected) { + logger.info("succeed to connect to zookeeper"); monitor.logEvent(EVENT_NAME, "zookeeper:rebuild_success", ""); } else { + logger.warn("unable to connect to zookeeper:" + address); monitor.logEvent(EVENT_NAME, "zookeeper:rebuild_failure", ""); } From b0c0d8b862876ac3d7fd1e2f8e54574623e87b31 Mon Sep 17 00:00:00 2001 From: xiangwu Date: Sat, 4 Feb 2017 15:39:11 +0800 Subject: [PATCH 6/9] fix bug with governor url --- .../console/servlet/ServiceServlet.java | 21 +++++++++++-------- pigeon-console/src/main/resources/Service.ftl | 9 +++----- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pigeon-console/src/main/java/com/dianping/pigeon/console/servlet/ServiceServlet.java b/pigeon-console/src/main/java/com/dianping/pigeon/console/servlet/ServiceServlet.java index b97e15ea..2355a4f4 100644 --- a/pigeon-console/src/main/java/com/dianping/pigeon/console/servlet/ServiceServlet.java +++ b/pigeon-console/src/main/java/com/dianping/pigeon/console/servlet/ServiceServlet.java @@ -18,9 +18,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import com.dianping.pigeon.remoting.invoker.config.InvokerConfig; import org.apache.commons.lang.StringUtils; -import com.dianping.pigeon.log.Logger; +import org.springframework.aop.support.AopUtils; import com.dianping.pigeon.config.ConfigManager; import com.dianping.pigeon.config.ConfigManagerLoader; @@ -30,10 +29,12 @@ import com.dianping.pigeon.console.status.checker.GlobalStatusChecker; import com.dianping.pigeon.console.status.checker.ProviderStatusChecker; import com.dianping.pigeon.console.status.checker.StatusChecker; +import com.dianping.pigeon.log.Logger; import com.dianping.pigeon.log.LoggerLoader; import com.dianping.pigeon.registry.RegistryManager; import com.dianping.pigeon.remoting.ServiceFactory; import com.dianping.pigeon.remoting.common.util.ServiceConfigUtils; +import com.dianping.pigeon.remoting.invoker.config.InvokerConfig; import com.dianping.pigeon.remoting.provider.ProviderBootStrap; import com.dianping.pigeon.remoting.provider.Server; import com.dianping.pigeon.remoting.provider.config.ProviderConfig; @@ -46,7 +47,6 @@ import freemarker.template.DefaultObjectWrapper; import freemarker.template.Template; import freemarker.template.TemplateException; -import org.springframework.aop.support.AopUtils; /** * @author sean.wang @@ -114,7 +114,7 @@ public Map> getServiceProviders() { return ServiceFactory.getAllServiceProviders(); } - public Map, Object> getInvokerConfigs(){ + public Map, Object> getInvokerConfigs() { return ServiceFactory.getAllServiceInvokers(); } @@ -174,8 +174,10 @@ protected boolean initServicePage(HttpServletRequest request, HttpServletRespons page.setAppName(configManager.getAppName()); page.setStartTime(ProviderBootStrap.getStartTime() + ""); page.setValidate("" + isValidate); - page.setGovernorUrl(configManager.getStringValue("pigeon.governor.address") - + "/services/" + configManager.getAppName()); + String governorAddr = configManager.getStringValue("pigeon.governor.address"); + if (StringUtils.isNotBlank(governorAddr)) { + page.setGovernorUrl(governorAddr + "/services/" + configManager.getAppName()); + } this.model = page; return true; } @@ -220,7 +222,8 @@ public String getContentType() { return "text/html; charset=UTF-8"; } - protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { response.setContentType(getContentType()); response.setStatus(HttpServletResponse.SC_OK); @@ -232,8 +235,8 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S doGet(req, resp); } - protected void generateView(HttpServletRequest request, HttpServletResponse response) throws IOException, - ServletException { + protected void generateView(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { Template temp = cfg.getTemplate(getView()); boolean result = initServicePage(request, response); if (result) { diff --git a/pigeon-console/src/main/resources/Service.ftl b/pigeon-console/src/main/resources/Service.ftl index f21f3495..88de55d9 100644 --- a/pigeon-console/src/main/resources/Service.ftl +++ b/pigeon-console/src/main/resources/Service.ftl @@ -98,15 +98,12 @@ -
-
- admin portal: ${governorUrl} -
-
-
+ <#if governorUrl != ""> +

admin portal: ${governorUrl}

+

pigeon services registered at port ${port}

version: ${version}

env: ${environment}

From 5ef8fa004fbc75514897a2ed1735ab3266bffe43 Mon Sep 17 00:00:00 2001 From: xiangwu Date: Mon, 6 Feb 2017 16:52:18 +0800 Subject: [PATCH 7/9] dynamic update zk address --- .../registry/zookeeper/CuratorClient.java | 78 +- .../registry/zookeeper/CuratorRegistry.java | 1109 ++++++++--------- .../zookeeper/CuratorRegistryTest.java | 3 +- .../pigeon/registry/util/Constants.java | 5 - 4 files changed, 613 insertions(+), 582 deletions(-) diff --git a/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java b/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java index 55a174a9..70a78411 100644 --- a/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java +++ b/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorClient.java @@ -6,13 +6,13 @@ import java.util.concurrent.Executors; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.RandomUtils; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; -import com.dianping.pigeon.log.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -20,6 +20,7 @@ import com.dianping.pigeon.config.ConfigChangeListener; import com.dianping.pigeon.config.ConfigManager; import com.dianping.pigeon.config.ConfigManagerLoader; +import com.dianping.pigeon.log.Logger; import com.dianping.pigeon.log.LoggerLoader; import com.dianping.pigeon.monitor.Monitor; import com.dianping.pigeon.monitor.MonitorLoader; @@ -36,11 +37,11 @@ public class CuratorClient { private CuratorFramework client; - private int retries = configManager.getIntValue("pigeon.registry.curator.retries", Integer.MAX_VALUE); + private volatile int retries = configManager.getIntValue("pigeon.registry.curator.retries", Integer.MAX_VALUE); - private int retryInterval = configManager.getIntValue("pigeon.registry.curator.retryinterval", 3000); + private volatile int retryInterval = configManager.getIntValue("pigeon.registry.curator.retryinterval", 3000); - private int retryLimit = configManager.getIntValue("pigeon.registry.curator.retrylimit", 500); + private volatile int retryLimit = configManager.getIntValue("pigeon.registry.curator.retrylimit", 50); private int sessionTimeout = configManager.getIntValue("pigeon.registry.curator.sessiontimeout", 30 * 1000); @@ -54,19 +55,35 @@ public class CuratorClient { private static Monitor monitor = MonitorLoader.getMonitor(); - private String address; + private static final String KEY_REGISTRY_ADDRESS = "pigeon.registry.address"; + + private volatile String address; private final String EVENT_NAME = "Pigeon.registry"; public CuratorClient(String zkAddress) throws Exception { + newCuratorClient(zkAddress); + } + + public CuratorClient() throws Exception { + String zkAddress = configManager.getStringValue(KEY_REGISTRY_ADDRESS); + newCuratorClient(zkAddress); + } + + private void newCuratorClient(String zkAddress) throws Exception { + if (StringUtils.isBlank(zkAddress)) { + throw new IllegalArgumentException("zookeeper address is required"); + } + logger.info("start to initialize zookeeper client:" + zkAddress); this.address = zkAddress; newCuratorClient(); curatorStateListenerThreadPool.execute(new CuratorStateListener()); configManager.registerConfigChangeListener(new InnerConfigChangeListener()); + logger.info("succeed to initialize zookeeper client:" + zkAddress); } private boolean newCuratorClient() throws InterruptedException { - logger.info("begin to create zookeeper client"); + logger.info("begin to create zookeeper client:" + address); // CuratorFramework client = CuratorFrameworkFactory.newClient(address, // sessionTimeout, connectionTimeout, // new MyRetryPolicy(retries, retryInterval)); @@ -91,7 +108,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { close(oldClient); if (isConnected) { - logger.info("succeed to connect to zookeeper"); + logger.info("succeed to connect to zookeeper:" + address); monitor.logEvent(EVENT_NAME, "zookeeper:rebuild_success", ""); } else { logger.warn("unable to connect to zookeeper:" + address); @@ -105,6 +122,19 @@ public CuratorFramework getClient() { return client; } + public boolean isConnected() { + final CuratorFramework cf = getClient(); + if (cf != null) { + try { + return cf.getZookeeperClient().getZooKeeper().getState().isConnected() + && cf.getZookeeperClient().isConnected(); + } catch (Exception e) { + return false; + } + } + return false; + } + private class CuratorStateListener implements Runnable { private final Logger logger = LoggerLoader.getLogger(CuratorStateListener.class); @@ -115,14 +145,13 @@ public void run() { boolean isSuccess = true; while (!Thread.currentThread().isInterrupted()) { try { - Thread.sleep(sleepTime); + Thread.sleep(sleepTime * (1 + RandomUtils.nextInt(20))); final CuratorFramework cf = getClient(); if (cf != null) { int retryCount = ((MyRetryPolicy) cf.getZookeeperClient().getRetryPolicy()).getRetryCount(); boolean isConnected = false; try { - isConnected = cf.getZookeeperClient().getZooKeeper().getState().isConnected() - && cf.getZookeeperClient().isConnected(); + isConnected = isConnected(); } catch (Exception e) { logger.info("error with zookeeper client's connection:" + e.toString()); } @@ -136,7 +165,8 @@ public void run() { } else { failCount++; if (retryCount > 0) { - logger.info("zookeeper client's retries:" + retryCount); + logger.info("zookeeper client's retries:" + retryCount + ", fails:" + failCount + + ", limit:" + retryLimit); } } if (failCount > retryLimit) { @@ -153,13 +183,14 @@ public void run() { } } - private boolean rebuildCuratorClient() throws InterruptedException { - boolean isSuccess = newCuratorClient(); - if (isSuccess) { - RegistryEventListener.connectionReconnected(); - } - return isSuccess; + } + + private boolean rebuildCuratorClient() throws InterruptedException { + boolean isSuccess = newCuratorClient(); + if (isSuccess) { + RegistryEventListener.connectionReconnected(); } + return isSuccess; } private static class MyRetryPolicy extends RetryNTimes { @@ -174,7 +205,7 @@ public MyRetryPolicy(int n, int sleepMsBetweenRetries) { @Override protected int getSleepTimeMs(int retryCount, long elapsedTimeMs) { this.retryCount = retryCount; - return sleepMsBetweenRetries; + return sleepMsBetweenRetries * (1 + RandomUtils.nextInt(20)); } public int getRetryCount() { @@ -386,6 +417,15 @@ public void onKeyUpdated(String key, String value) { retryLimit = Integer.valueOf(value); } catch (RuntimeException e) { } + } else if (key.endsWith(KEY_REGISTRY_ADDRESS)) { + address = value; + logger.info("registry address changed:" + address); + try { + Thread.sleep(RandomUtils.nextInt(180) * 1000); + rebuildCuratorClient(); + } catch (Exception e) { + logger.warn("rebuild curator client failed:", e); + } } } @@ -401,7 +441,7 @@ public void onKeyRemoved(String key) { public String getStatistics() { CuratorZookeeperClient client = getClient().getZookeeperClient(); - return new StringBuilder().append("connected:").append(client.isConnected()).append(", retries:") + return new StringBuilder().append("address:").append(client.getCurrentConnectionString()).append(", connected:").append(isConnected()).append(", retries:") .append(((MyRetryPolicy) client.getRetryPolicy()).getRetryCount()).toString(); } diff --git a/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistry.java b/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistry.java index 5c9e797e..84cde717 100644 --- a/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistry.java +++ b/pigeon-extensions/pigeon-registry-zookeeper/src/main/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistry.java @@ -26,560 +26,557 @@ public class CuratorRegistry implements Registry { - private static Logger logger = LoggerLoader.getLogger(CuratorRegistry.class); - - private ConfigManager configManager = ConfigManagerLoader.getConfigManager(); - - private CuratorClient client; - - private volatile boolean inited = false; - - private final boolean delEmptyNode = configManager.getBooleanValue("pigeon.registry.delemptynode", true); - - @Override - public void init() { - if (!inited) { - synchronized (this) { - if (!inited) { - try { - String zkAddress = configManager.getStringValue(Constants.KEY_REGISTRY_ADDRESS); - if (StringUtils.isBlank(zkAddress)) { - throw new IllegalArgumentException("zookeeper address is required"); - } - logger.info("start to initialize zookeeper client:" + zkAddress); - client = new CuratorClient(zkAddress); - logger.info("succeed to initialize zookeeper client:" + zkAddress); - inited = true; - } catch (Exception ex) { - logger.error("failed to initialize zookeeper client", ex); - throw new RuntimeException(ex); - } - } - } - } - } - - @Override - public boolean isEnable() { - return inited; - } - - @Override - public String getName() { - return Constants.REGISTRY_CURATOR_NAME; - } - - @Override - public String getServiceAddress(String serviceName) throws RegistryException { - return getServiceAddress(serviceName, Constants.DEFAULT_GROUP); - } - - public String getServiceAddress(String serviceName, String group) throws RegistryException { - return getServiceAddress(serviceName, group, true); - } - - public String getServiceAddress(String serviceName, String group, boolean fallbackDefaultGroup) - throws RegistryException { - return getServiceAddress(serviceName, group, fallbackDefaultGroup, true); - } - - @Override - public String getServiceAddress(String remoteAppkey, String serviceName, String group, boolean fallbackDefaultGroup) - throws RegistryException { - // mtthrift service, pigeon zk registry do nothing - return ""; - } - - @Override - public void registerService(String serviceName, String group, String serviceAddress, int weight) - throws RegistryException { - registerPersistentNode(serviceName, group, serviceAddress, weight); - } - - void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight) - throws RegistryException { - String weightPath = Utils.getWeightPath(serviceAddress); - String servicePath = Utils.getServicePath(serviceName, group); - try { - if (client.exists(servicePath, false)) { - Stat stat = new Stat(); - String addressValue = client.getWithNodeExistsEx(servicePath, stat); - String[] addressArray = addressValue.split(","); - List addressList = new ArrayList(); - for (String addr : addressArray) { - addr = addr.trim(); - if (addr.length() > 0 && !addressList.contains(addr)) { - addressList.add(addr.trim()); - } - } - if (!addressList.contains(serviceAddress)) { - addressList.add(serviceAddress); - Collections.sort(addressList); - client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion()); - } - } else { - client.create(servicePath, serviceAddress); - } - if (weight >= 0) { - client.set(weightPath, "" + weight); - } - if (logger.isInfoEnabled()) { - logger.info("registered service to persistent node: " + servicePath); - } - } catch (Throwable e) { - if (e instanceof BadVersionException || e instanceof NodeExistsException) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - // ignore - } - registerPersistentNode(serviceName, group, serviceAddress, weight); - } else { - logger.info("failed to register service to " + servicePath, e); - throw new RegistryException(e); - } - - } - } - - @Override - public void unregisterService(String serviceName, String serviceAddress) throws RegistryException { - unregisterService(serviceName, Constants.DEFAULT_GROUP, serviceAddress); - } - - @Override - public void unregisterService(String serviceName, String group, String serviceAddress) throws RegistryException { - unregisterPersistentNode(serviceName, group, serviceAddress); - } - - public void unregisterPersistentNode(String serviceName, String group, String serviceAddress) - throws RegistryException { - String servicePath = Utils.getServicePath(serviceName, group); - try { - if (client.exists(servicePath, false)) { - Stat stat = new Stat(); - String addressValue = client.getWithNodeExistsEx(servicePath, stat); - String[] addressArray = addressValue.split(","); - List addressList = new ArrayList(); - for (String addr : addressArray) { - addr = addr.trim(); - if (addr.length() > 0 && !addressList.contains(addr)) { - addressList.add(addr); - } - } - if (addressList.contains(serviceAddress)) { - addressList.remove(serviceAddress); - if (!addressList.isEmpty()) { - Collections.sort(addressList); - client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion()); - } else { - List children = client.getChildren(servicePath, false); - if (CollectionUtils.isEmpty(children)) { - if (delEmptyNode) { - try { - client.delete(servicePath); - } catch (NoNodeException e) { - logger.warn("Already deleted path:" + servicePath + ":" + e.getMessage()); - } - } else { - client.set(servicePath, "", stat.getVersion()); - } - } else { - logger.warn("Existing children [" + children + "] under path:" + servicePath); - client.set(servicePath, "", stat.getVersion()); - } - } - } - if (logger.isInfoEnabled()) { - logger.info("unregistered service from " + servicePath); - } - } - } catch (Throwable e) { - if (e instanceof BadVersionException) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - // ignore - } - unregisterPersistentNode(serviceName, group, serviceAddress); - } else { - logger.info("failed to unregister service from " + servicePath, e); - throw new RegistryException(e); - } - } - } - - @Override - public int getServerWeight(String serverAddress) throws RegistryException { - String path = Utils.getWeightPath(serverAddress); - String strWeight; - try { - strWeight = client.get(path); - int result = Constants.DEFAULT_WEIGHT; - if (strWeight != null) { - try { - result = Integer.parseInt(strWeight); - } catch (NumberFormatException e) { - logger.warn("invalid weight for " + serverAddress + ": " + strWeight); - } - } - return result; - } catch (Throwable e) { - logger.info("failed to get weight for " + serverAddress); - throw new RegistryException(e); - } - } - - @Override - public void setServerWeight(String serverAddress, int weight) throws RegistryException { - String path = Utils.getWeightPath(serverAddress); - try { - client.set(path, weight); - } catch (Throwable e) { - logger.info("failed to set weight of " + serverAddress + " to " + weight); - throw new RegistryException(e); - } - } - - @Override - public List getChildren(String path) throws RegistryException { - try { - List children = client.getChildren(path); - return children; - } catch (Throwable e) { - logger.info("failed to get children of node: " + path, e); - throw new RegistryException(e); - } - } - - public void close() { - client.close(); - } - - public CuratorClient getCuratorClient() { - return client; - } - - @Override - public String getServerApp(String serverAddress) throws RegistryException { - String path = Utils.getAppPath(serverAddress); - try { - return client.get(path); - } catch (Throwable e) { - logger.info("failed to get app for " + serverAddress); - throw new RegistryException(e); - } - } - - @Override - public void setServerApp(String serverAddress, String app) { - String path = Utils.getAppPath(serverAddress); - if (StringUtils.isNotBlank(app)) { - try { - client.set(path, app); - } catch (Throwable e) { - logger.info("failed to set app of " + serverAddress + " to " + app); - } - } - } - - public void unregisterServerApp(String serverAddress) { - String path = Utils.getAppPath(serverAddress); - try { - if (client.exists(path, false)) { - client.delete(path); - } - } catch (Throwable e) { - logger.info("failed to delete app:" + path + ", caused by:" + e.getMessage()); - } - } - - @Override - public void setServerVersion(String serverAddress, String version) { - String path = Utils.getVersionPath(serverAddress); - if (StringUtils.isNotBlank(version)) { - try { - client.set(path, version); - } catch (Throwable e) { - logger.info("failed to set version of " + serverAddress + " to " + version); - } - } - } - - @Override - public String getServerVersion(String serverAddress) throws RegistryException { - String path = Utils.getVersionPath(serverAddress); - try { - return client.get(path); - } catch (Throwable e) { - logger.info("failed to get version for " + serverAddress); - throw new RegistryException(e); - } - } - - public void unregisterServerVersion(String serverAddress) { - String path = Utils.getVersionPath(serverAddress); - try { - if (client.exists(path, false)) { - client.delete(path); - } - } catch (Throwable e) { - logger.info("failed to delete version:" + path + ", caused by:" + e.getMessage()); - } - } - - @Override - public String getStatistics() { - return getName() + ":" + client.getStatistics(); - } - - @Override - public byte getServerHeartBeatSupport(String serviceAddress) throws RegistryException { - if (isSupportNewProtocol(serviceAddress)) { - return HeartBeatSupport.BothSupport.getValue(); - } else { - return HeartBeatSupport.P2POnly.getValue(); - } - } - - @Override - public void setServerService(String serviceName, String group, String hosts) throws RegistryException { - String servicePath = Utils.getServicePath(serviceName, group); - - try { - client.set(servicePath, hosts); - } catch (Throwable e) { - logger.info("failed to set service hosts of " + serviceName + " to " + hosts); - throw new RegistryException(e); - } - } - - @Override - public void delServerService(String serviceName, String group) throws RegistryException { - String servicePath = Utils.getServicePath(serviceName, group); - - try { - List children = client.getChildren(servicePath); - - if (children != null && children.size() > 0) { - client.set(servicePath, ""); - } else { - client.delete(servicePath); - } - } catch (Throwable e) { - logger.info("failed to delete service hosts of " + serviceName); - throw new RegistryException(e); - } - } - - @Override - public void setHostsWeight(String serviceName, String group, String hosts, int weight) throws RegistryException { - - for (String host : hosts.split(",")) { - setServerWeight(host, weight); - } - } - - @Override - public String getServiceAddress(String remoteAppkey, String serviceName, String group, boolean fallbackDefaultGroup, - boolean needListener) throws RegistryException { - // blank - return ""; - } - - @Override - public String getServiceAddress(String serviceName, String group, boolean fallbackDefaultGroup, - boolean needListener) throws RegistryException { - try { - String path = Utils.getServicePath(serviceName, group); - String address = client.get(path, needListener); - if (!StringUtils.isBlank(group)) { - boolean needFallback = false; - if (StringUtils.isBlank(address)) { - needFallback = true; - } else { - String[] addressArray = address.split(","); - int weightCount = 0; - for (String addr : addressArray) { - addr = addr.trim(); - if (addr.length() > 0) { - int weight = RegistryManager.getInstance().getServiceWeight(addr); - if (weight > 0) { - weightCount += weight; - } - } - } - if (weightCount == 0) { - needFallback = true; - logger.info("weight is 0 with address:" + address); - } - } - if (fallbackDefaultGroup && needFallback) { - logger.info("node " + path + " does not exist, fallback to default group"); - path = Utils.getServicePath(serviceName, Constants.DEFAULT_GROUP); - address = client.get(path, needListener); - } - } - return address; - } catch (Exception e) { - logger.info("failed to get service address for " + serviceName + "/" + group, e); - throw new RegistryException(e); - } - } - - @Override - public void updateHeartBeat(String serviceAddress, Long heartBeatTimeMillis) { - try { - String heartBeatPath = Utils.getHeartBeatPath(serviceAddress); - client.set(heartBeatPath, heartBeatTimeMillis); - } catch (Throwable e) { - logger.info("failed to update heartbeat", e); - } - } - - @Override - public void deleteHeartBeat(String serviceAddress) { - try { - String heartBeatPath = Utils.getHeartBeatPath(serviceAddress); - client.delete(heartBeatPath); - } catch (Throwable e) { - logger.info("failed to delete heartbeat", e); - } - } - - @Override - public boolean isSupportNewProtocol(String serviceAddress) throws RegistryException { - String version = getServerVersion(serviceAddress); - - if (StringUtils.isBlank(version)) { - throw new RegistryException("version is blank"); - } - - return VersionUtils.isThriftSupported(version); - } - - @Override - public boolean isSupportNewProtocol(String serviceAddress, String serviceName) throws RegistryException { - try { - String protocolPath = Utils.getProtocolPath(serviceAddress); - String info = client.get(protocolPath); - - if (info != null) { - Map infoMap = Utils.getProtocolInfoMap(info); - Boolean support = infoMap.get(serviceName); - if (support != null) { - return support; - } - } - - return false; - } catch (Throwable e) { - logger.info("failed to get protocol:" + serviceName + "of host:" + serviceAddress + ", caused by:" - + e.getMessage()); - throw new RegistryException(e); - } - } - - @Override - public void setSupportNewProtocol(String serviceAddress, String serviceName, boolean support) - throws RegistryException { - try { - String protocolPath = Utils.getProtocolPath(serviceAddress); - if (client.exists(protocolPath, false)) { - Stat stat = new Stat(); - String info = client.getWithNodeExistsEx(protocolPath, stat); - Map infoMap = Utils.getProtocolInfoMap(info); - infoMap.put(serviceName, support); - client.set(protocolPath, Utils.getProtocolInfo(infoMap), stat.getVersion()); - } else { - Map infoMap = ImmutableMap.of(serviceName, support); - client.create(protocolPath, Utils.getProtocolInfo(infoMap)); - } - - } catch (Throwable e) { - if (e instanceof BadVersionException || e instanceof NodeExistsException) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - // ignore - } - setSupportNewProtocol(serviceAddress, serviceName, support); - } else { - logger.info("failed to set protocol:" + serviceName + "of host:" + serviceAddress + " to:" + support - + ", caused by:" + e.getMessage()); - throw new RegistryException(e); - } - - } - } - - @Override - public void unregisterSupportNewProtocol(String serviceAddress, String serviceName, boolean support) - throws RegistryException { - try { - String protocolPath = Utils.getProtocolPath(serviceAddress); - if (client.exists(protocolPath, false)) { - Stat stat = new Stat(); - String info = client.getWithNodeExistsEx(protocolPath, stat); - Map infoMap = Utils.getProtocolInfoMap(info); - infoMap.remove(serviceName); - - if (infoMap.size() == 0 && delEmptyNode) { - client.delete(protocolPath); - } else { - client.set(protocolPath, Utils.getProtocolInfo(infoMap), stat.getVersion()); - } - } - - } catch (Throwable e) { - if (e instanceof BadVersionException || e instanceof NodeExistsException) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - // ignore - } - unregisterSupportNewProtocol(serviceAddress, serviceName, support); - } else { - logger.info("failed to del protocol:" + serviceName + "of host:" + serviceAddress + ", caused by:" - + e.getMessage()); - throw new RegistryException(e); - } - - } - } - - @Override - public void setConsoleAddress(String consoleAddress) { - String clientPath = Utils.getConsolePath(consoleAddress); - try { - client.set(clientPath, null); - } catch (Throwable t) { - logger.info("failed to set consolePath " + clientPath, t); - } - } - - @Override - public void unregisterConsoleAddress(String consoleAddress) { - String clientPath = Utils.getConsolePath(consoleAddress); - try { - client.delete(clientPath); - } catch (Throwable t) { - logger.info("failed to delete consolePath " + clientPath, t); - } - - } - - @Override - public List getConsoleAddresses() { - List consoleAddresses = null; - String consoleRootPath = Utils.getConsoleRootPath(); - - try { - consoleAddresses = client.getChildren(consoleRootPath); - } catch (Throwable t) { - logger.info("failed to get consoleRootPath " + consoleRootPath, t); - } - - return consoleAddresses; - } + private static Logger logger = LoggerLoader.getLogger(CuratorRegistry.class); + + private ConfigManager configManager = ConfigManagerLoader.getConfigManager(); + + private CuratorClient client; + + private volatile boolean inited = false; + + private final boolean delEmptyNode = configManager.getBooleanValue("pigeon.registry.delemptynode", true); + + @Override + public void init() { + if (!inited) { + synchronized (this) { + if (!inited) { + try { + client = new CuratorClient(); + if (!client.isConnected()) { + throw new IllegalStateException("unable to connect to zookeeper"); + } + inited = true; + } catch (Exception ex) { + logger.error("failed to initialize zookeeper client", ex); + throw new RuntimeException(ex); + } + } + } + } + } + + @Override + public boolean isEnable() { + return inited; + } + + @Override + public String getName() { + return Constants.REGISTRY_CURATOR_NAME; + } + + @Override + public String getServiceAddress(String serviceName) throws RegistryException { + return getServiceAddress(serviceName, Constants.DEFAULT_GROUP); + } + + public String getServiceAddress(String serviceName, String group) throws RegistryException { + return getServiceAddress(serviceName, group, true); + } + + public String getServiceAddress(String serviceName, String group, boolean fallbackDefaultGroup) + throws RegistryException { + return getServiceAddress(serviceName, group, fallbackDefaultGroup, true); + } + + @Override + public String getServiceAddress(String remoteAppkey, String serviceName, String group, boolean fallbackDefaultGroup) + throws RegistryException { + // mtthrift service, pigeon zk registry do nothing + return ""; + } + + @Override + public void registerService(String serviceName, String group, String serviceAddress, int weight) + throws RegistryException { + registerPersistentNode(serviceName, group, serviceAddress, weight); + } + + void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight) + throws RegistryException { + String weightPath = Utils.getWeightPath(serviceAddress); + String servicePath = Utils.getServicePath(serviceName, group); + try { + if (client.exists(servicePath, false)) { + Stat stat = new Stat(); + String addressValue = client.getWithNodeExistsEx(servicePath, stat); + String[] addressArray = addressValue.split(","); + List addressList = new ArrayList(); + for (String addr : addressArray) { + addr = addr.trim(); + if (addr.length() > 0 && !addressList.contains(addr)) { + addressList.add(addr.trim()); + } + } + if (!addressList.contains(serviceAddress)) { + addressList.add(serviceAddress); + Collections.sort(addressList); + client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion()); + } + } else { + client.create(servicePath, serviceAddress); + } + if (weight >= 0) { + client.set(weightPath, "" + weight); + } + if (logger.isInfoEnabled()) { + logger.info("registered service to persistent node: " + servicePath); + } + } catch (Throwable e) { + if (e instanceof BadVersionException || e instanceof NodeExistsException) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + // ignore + } + registerPersistentNode(serviceName, group, serviceAddress, weight); + } else { + logger.info("failed to register service to " + servicePath, e); + throw new RegistryException(e); + } + + } + } + + @Override + public void unregisterService(String serviceName, String serviceAddress) throws RegistryException { + unregisterService(serviceName, Constants.DEFAULT_GROUP, serviceAddress); + } + + @Override + public void unregisterService(String serviceName, String group, String serviceAddress) throws RegistryException { + unregisterPersistentNode(serviceName, group, serviceAddress); + } + + public void unregisterPersistentNode(String serviceName, String group, String serviceAddress) + throws RegistryException { + String servicePath = Utils.getServicePath(serviceName, group); + try { + if (client.exists(servicePath, false)) { + Stat stat = new Stat(); + String addressValue = client.getWithNodeExistsEx(servicePath, stat); + String[] addressArray = addressValue.split(","); + List addressList = new ArrayList(); + for (String addr : addressArray) { + addr = addr.trim(); + if (addr.length() > 0 && !addressList.contains(addr)) { + addressList.add(addr); + } + } + if (addressList.contains(serviceAddress)) { + addressList.remove(serviceAddress); + if (!addressList.isEmpty()) { + Collections.sort(addressList); + client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion()); + } else { + List children = client.getChildren(servicePath, false); + if (CollectionUtils.isEmpty(children)) { + if (delEmptyNode) { + try { + client.delete(servicePath); + } catch (NoNodeException e) { + logger.warn("Already deleted path:" + servicePath + ":" + e.getMessage()); + } + } else { + client.set(servicePath, "", stat.getVersion()); + } + } else { + logger.warn("Existing children [" + children + "] under path:" + servicePath); + client.set(servicePath, "", stat.getVersion()); + } + } + } + if (logger.isInfoEnabled()) { + logger.info("unregistered service from " + servicePath); + } + } + } catch (Throwable e) { + if (e instanceof BadVersionException) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + // ignore + } + unregisterPersistentNode(serviceName, group, serviceAddress); + } else { + logger.info("failed to unregister service from " + servicePath, e); + throw new RegistryException(e); + } + } + } + + @Override + public int getServerWeight(String serverAddress) throws RegistryException { + String path = Utils.getWeightPath(serverAddress); + String strWeight; + try { + strWeight = client.get(path); + int result = Constants.DEFAULT_WEIGHT; + if (strWeight != null) { + try { + result = Integer.parseInt(strWeight); + } catch (NumberFormatException e) { + logger.warn("invalid weight for " + serverAddress + ": " + strWeight); + } + } + return result; + } catch (Throwable e) { + logger.info("failed to get weight for " + serverAddress); + throw new RegistryException(e); + } + } + + @Override + public void setServerWeight(String serverAddress, int weight) throws RegistryException { + String path = Utils.getWeightPath(serverAddress); + try { + client.set(path, weight); + } catch (Throwable e) { + logger.info("failed to set weight of " + serverAddress + " to " + weight); + throw new RegistryException(e); + } + } + + @Override + public List getChildren(String path) throws RegistryException { + try { + List children = client.getChildren(path); + return children; + } catch (Throwable e) { + logger.info("failed to get children of node: " + path, e); + throw new RegistryException(e); + } + } + + public void close() { + client.close(); + } + + public CuratorClient getCuratorClient() { + return client; + } + + @Override + public String getServerApp(String serverAddress) throws RegistryException { + String path = Utils.getAppPath(serverAddress); + try { + return client.get(path); + } catch (Throwable e) { + logger.info("failed to get app for " + serverAddress); + throw new RegistryException(e); + } + } + + @Override + public void setServerApp(String serverAddress, String app) { + String path = Utils.getAppPath(serverAddress); + if (StringUtils.isNotBlank(app)) { + try { + client.set(path, app); + } catch (Throwable e) { + logger.info("failed to set app of " + serverAddress + " to " + app); + } + } + } + + public void unregisterServerApp(String serverAddress) { + String path = Utils.getAppPath(serverAddress); + try { + if (client.exists(path, false)) { + client.delete(path); + } + } catch (Throwable e) { + logger.info("failed to delete app:" + path + ", caused by:" + e.getMessage()); + } + } + + @Override + public void setServerVersion(String serverAddress, String version) { + String path = Utils.getVersionPath(serverAddress); + if (StringUtils.isNotBlank(version)) { + try { + client.set(path, version); + } catch (Throwable e) { + logger.info("failed to set version of " + serverAddress + " to " + version); + } + } + } + + @Override + public String getServerVersion(String serverAddress) throws RegistryException { + String path = Utils.getVersionPath(serverAddress); + try { + return client.get(path); + } catch (Throwable e) { + logger.info("failed to get version for " + serverAddress); + throw new RegistryException(e); + } + } + + public void unregisterServerVersion(String serverAddress) { + String path = Utils.getVersionPath(serverAddress); + try { + if (client.exists(path, false)) { + client.delete(path); + } + } catch (Throwable e) { + logger.info("failed to delete version:" + path + ", caused by:" + e.getMessage()); + } + } + + @Override + public String getStatistics() { + return getName() + ":" + client.getStatistics(); + } + + @Override + public byte getServerHeartBeatSupport(String serviceAddress) throws RegistryException { + if (isSupportNewProtocol(serviceAddress)) { + return HeartBeatSupport.BothSupport.getValue(); + } else { + return HeartBeatSupport.P2POnly.getValue(); + } + } + + @Override + public void setServerService(String serviceName, String group, String hosts) throws RegistryException { + String servicePath = Utils.getServicePath(serviceName, group); + + try { + client.set(servicePath, hosts); + } catch (Throwable e) { + logger.info("failed to set service hosts of " + serviceName + " to " + hosts); + throw new RegistryException(e); + } + } + + @Override + public void delServerService(String serviceName, String group) throws RegistryException { + String servicePath = Utils.getServicePath(serviceName, group); + + try { + List children = client.getChildren(servicePath); + + if (children != null && children.size() > 0) { + client.set(servicePath, ""); + } else { + client.delete(servicePath); + } + } catch (Throwable e) { + logger.info("failed to delete service hosts of " + serviceName); + throw new RegistryException(e); + } + } + + @Override + public void setHostsWeight(String serviceName, String group, String hosts, int weight) throws RegistryException { + + for (String host : hosts.split(",")) { + setServerWeight(host, weight); + } + } + + @Override + public String getServiceAddress(String remoteAppkey, String serviceName, String group, boolean fallbackDefaultGroup, + boolean needListener) throws RegistryException { + // blank + return ""; + } + + @Override + public String getServiceAddress(String serviceName, String group, boolean fallbackDefaultGroup, + boolean needListener) throws RegistryException { + try { + String path = Utils.getServicePath(serviceName, group); + String address = client.get(path, needListener); + if (!StringUtils.isBlank(group)) { + boolean needFallback = false; + if (StringUtils.isBlank(address)) { + needFallback = true; + } else { + String[] addressArray = address.split(","); + int weightCount = 0; + for (String addr : addressArray) { + addr = addr.trim(); + if (addr.length() > 0) { + int weight = RegistryManager.getInstance().getServiceWeight(addr); + if (weight > 0) { + weightCount += weight; + } + } + } + if (weightCount == 0) { + needFallback = true; + logger.info("weight is 0 with address:" + address); + } + } + if (fallbackDefaultGroup && needFallback) { + logger.info("node " + path + " does not exist, fallback to default group"); + path = Utils.getServicePath(serviceName, Constants.DEFAULT_GROUP); + address = client.get(path, needListener); + } + } + return address; + } catch (Exception e) { + logger.info("failed to get service address for " + serviceName + "/" + group, e); + throw new RegistryException(e); + } + } + + @Override + public void updateHeartBeat(String serviceAddress, Long heartBeatTimeMillis) { + try { + String heartBeatPath = Utils.getHeartBeatPath(serviceAddress); + client.set(heartBeatPath, heartBeatTimeMillis); + } catch (Throwable e) { + logger.info("failed to update heartbeat", e); + } + } + + @Override + public void deleteHeartBeat(String serviceAddress) { + try { + String heartBeatPath = Utils.getHeartBeatPath(serviceAddress); + client.delete(heartBeatPath); + } catch (Throwable e) { + logger.info("failed to delete heartbeat", e); + } + } + + @Override + public boolean isSupportNewProtocol(String serviceAddress) throws RegistryException { + String version = getServerVersion(serviceAddress); + + if (StringUtils.isBlank(version)) { + throw new RegistryException("version is blank"); + } + + return VersionUtils.isThriftSupported(version); + } + + @Override + public boolean isSupportNewProtocol(String serviceAddress, String serviceName) throws RegistryException { + try { + String protocolPath = Utils.getProtocolPath(serviceAddress); + String info = client.get(protocolPath); + + if (info != null) { + Map infoMap = Utils.getProtocolInfoMap(info); + Boolean support = infoMap.get(serviceName); + if (support != null) { + return support; + } + } + + return false; + } catch (Throwable e) { + logger.info("failed to get protocol:" + serviceName + "of host:" + serviceAddress + ", caused by:" + + e.getMessage()); + throw new RegistryException(e); + } + } + + @Override + public void setSupportNewProtocol(String serviceAddress, String serviceName, boolean support) + throws RegistryException { + try { + String protocolPath = Utils.getProtocolPath(serviceAddress); + if (client.exists(protocolPath, false)) { + Stat stat = new Stat(); + String info = client.getWithNodeExistsEx(protocolPath, stat); + Map infoMap = Utils.getProtocolInfoMap(info); + infoMap.put(serviceName, support); + client.set(protocolPath, Utils.getProtocolInfo(infoMap), stat.getVersion()); + } else { + Map infoMap = ImmutableMap.of(serviceName, support); + client.create(protocolPath, Utils.getProtocolInfo(infoMap)); + } + + } catch (Throwable e) { + if (e instanceof BadVersionException || e instanceof NodeExistsException) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + // ignore + } + setSupportNewProtocol(serviceAddress, serviceName, support); + } else { + logger.info("failed to set protocol:" + serviceName + "of host:" + serviceAddress + " to:" + support + + ", caused by:" + e.getMessage()); + throw new RegistryException(e); + } + + } + } + + @Override + public void unregisterSupportNewProtocol(String serviceAddress, String serviceName, boolean support) + throws RegistryException { + try { + String protocolPath = Utils.getProtocolPath(serviceAddress); + if (client.exists(protocolPath, false)) { + Stat stat = new Stat(); + String info = client.getWithNodeExistsEx(protocolPath, stat); + Map infoMap = Utils.getProtocolInfoMap(info); + infoMap.remove(serviceName); + + if (infoMap.size() == 0 && delEmptyNode) { + client.delete(protocolPath); + } else { + client.set(protocolPath, Utils.getProtocolInfo(infoMap), stat.getVersion()); + } + } + + } catch (Throwable e) { + if (e instanceof BadVersionException || e instanceof NodeExistsException) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + // ignore + } + unregisterSupportNewProtocol(serviceAddress, serviceName, support); + } else { + logger.info("failed to del protocol:" + serviceName + "of host:" + serviceAddress + ", caused by:" + + e.getMessage()); + throw new RegistryException(e); + } + + } + } + + @Override + public void setConsoleAddress(String consoleAddress) { + String clientPath = Utils.getConsolePath(consoleAddress); + try { + client.set(clientPath, null); + } catch (Throwable t) { + logger.info("failed to set consolePath " + clientPath, t); + } + } + + @Override + public void unregisterConsoleAddress(String consoleAddress) { + String clientPath = Utils.getConsolePath(consoleAddress); + try { + client.delete(clientPath); + } catch (Throwable t) { + logger.info("failed to delete consolePath " + clientPath, t); + } + + } + + @Override + public List getConsoleAddresses() { + List consoleAddresses = null; + String consoleRootPath = Utils.getConsoleRootPath(); + + try { + consoleAddresses = client.getChildren(consoleRootPath); + } catch (Throwable t) { + logger.info("failed to get consoleRootPath " + consoleRootPath, t); + } + + return consoleAddresses; + } } diff --git a/pigeon-extensions/pigeon-registry-zookeeper/src/test/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistryTest.java b/pigeon-extensions/pigeon-registry-zookeeper/src/test/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistryTest.java index 3601e70c..f088d9da 100644 --- a/pigeon-extensions/pigeon-registry-zookeeper/src/test/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistryTest.java +++ b/pigeon-extensions/pigeon-registry-zookeeper/src/test/java/com/dianping/pigeon/registry/zookeeper/CuratorRegistryTest.java @@ -18,7 +18,6 @@ import com.dianping.pigeon.registry.Registry; import com.dianping.pigeon.registry.exception.RegistryException; import com.dianping.pigeon.registry.listener.RegistryEventListener; -import com.dianping.pigeon.registry.util.Constants; public class CuratorRegistryTest { @@ -27,7 +26,7 @@ public class CuratorRegistryTest { @BeforeClass public static void startTestServer() throws Exception { server = new TestingServer(); - ConfigManagerLoader.getConfigManager().setLocalStringValue(Constants.KEY_REGISTRY_ADDRESS, + ConfigManagerLoader.getConfigManager().setLocalStringValue("pigeon.registry.address", server.getConnectString()); } diff --git a/pigeon-registry/src/main/java/com/dianping/pigeon/registry/util/Constants.java b/pigeon-registry/src/main/java/com/dianping/pigeon/registry/util/Constants.java index ff1406f7..5f0684cf 100755 --- a/pigeon-registry/src/main/java/com/dianping/pigeon/registry/util/Constants.java +++ b/pigeon-registry/src/main/java/com/dianping/pigeon/registry/util/Constants.java @@ -34,11 +34,6 @@ public final class Constants { public static final boolean DEFAULT_AUTO_REGISTER_BOOL = Boolean.parseBoolean(DEFAULT_AUTO_REGISTER); public static final String KEY_LOCAL_IP = "local.ip"; - public static final String KEY_REGISTRY_ADDRESS = "pigeon.registry.address"; - public static final String KEY_REGISTRY_TYPE = "pigeon.registry.type"; - public static final String REGISTRY_TYPE_ZOOKEEPER = "zookeeper"; - public static final String REGISTRY_TYPE_LOCAL = "local"; - public static final String DEFAULT_REGISTRY_TYPE = REGISTRY_TYPE_ZOOKEEPER; public static final String REGISTRY_MNS_NAME = "mns"; public static final String REGISTRY_CURATOR_NAME = "curator"; public static final String REGISTRY_COMPOSITE_NAME = "composite"; From 179711210a3fff8aac279b6f96bdba95a09cc881 Mon Sep 17 00:00:00 2001 From: "qi.yin" Date: Mon, 13 Feb 2017 10:07:45 +0800 Subject: [PATCH 8/9] add requestContextMap --- .../process/threadpool/RequestThreadPoolProcessor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java index 2ad4513f..f7bbf134 100755 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/provider/process/threadpool/RequestThreadPoolProcessor.java @@ -182,7 +182,7 @@ public void doStop() { public Future doProcessRequest(final InvocationRequest request, final ProviderContext providerContext) { -// requestContextMap.put(request, providerContext); + requestContextMap.put(request, providerContext); doMonitorData(request, providerContext); @@ -201,7 +201,7 @@ public InvocationResponse call() throws Exception { } catch (Throwable t) { logger.error("Process request failed with invocation handler, you should never be here.", t); } finally { -// requestContextMap.remove(request); + requestContextMap.remove(request); } return null; } @@ -213,7 +213,7 @@ public InvocationResponse call() throws Exception { providerContext.getTimeline().add(new TimePoint(TimePhase.T)); return pool.submit(requestExecutor); } catch (RejectedExecutionException e) { -// requestContextMap.remove(request); + requestContextMap.remove(request); throw new RejectedException(getProcessorStatistics(pool), e); } From a2f69b9f9f0347354c9de34019ff6b143c6b8146 Mon Sep 17 00:00:00 2001 From: "qi.yin" Date: Mon, 13 Feb 2017 11:18:43 +0800 Subject: [PATCH 9/9] update trace --- .../invoker/process/filter/TraceFilter.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/TraceFilter.java b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/TraceFilter.java index 1b6ff9d0..6e615c7f 100644 --- a/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/TraceFilter.java +++ b/pigeon-remoting/src/main/java/com/dianping/pigeon/remoting/invoker/process/filter/TraceFilter.java @@ -48,18 +48,20 @@ public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContex InvokerConfig config = invocationContext.getInvokerConfig(); - MonitorTransaction transaction = monitor.getCurrentCallTransaction(); - String rootMessage = StringUtils.EMPTY; - - if (transaction != null) { - rootMessage = transaction.getParentRootMessage(); - } - SourceKey srcKey = null; - if (StringUtils.isNotBlank(rootMessage)) { - srcKey = new OtherKey(rootMessage); - } else { - srcKey = new ApplicationKey(appName); - } +// MonitorTransaction transaction = monitor.getCurrentCallTransaction(); +// String rootMessage = StringUtils.EMPTY; +// +// if (transaction != null) { +// rootMessage = transaction.getParentRootMessage(); +// } +// SourceKey srcKey = null; +// if (StringUtils.isNotBlank(rootMessage)) { +// srcKey = new OtherKey(rootMessage); +// } else { +// srcKey = new ApplicationKey(appName); +// } + + SourceKey srcKey = new ApplicationKey(appName); InvokerMonitorData monitorData = MonitorDataFactory.newInvokerMonitorData(srcKey, new MethodKey(config.getUrl(), invocationContext.getMethodName()));