Skip to content

Commit

Permalink
2.7.8 Dev (#6386)
Browse files Browse the repository at this point in the history
* Polish #6296 : Adding the new methods into MetadataReport to manipulate the exported URLs for service introspection

* Polish #6296 : Adding the new methods into MetadataReport to manipulate the exported URLs for service introspection

* Polish #6171 : [Feature] Introducing the composite implementation of MetadataService

* Revert "fix wrong check of InvokerListener when export a service (fix issue_6269) (#6271)"

This reverts commit 91989ca.

* Revert "fix wrong check of InvokerListener when export a service (fix issue_6269) (#6271)"

This reverts commit 91989ca.

* Revert the MetadataReport

* Polish #6305 : [Refactor] ServiceConfig and ReferenceConfig publish the ServiceDefinition based on the Dubbo Event

* Polish #6198 : [Issue] Fixing NacosDynamicConfiguration#publishConfig bug

* Polish #6310 : Refactoring MetadataReport's methods

* Polish #6198 : [Issue] Fixing NacosDynamicConfiguration#publishConfig bug

* Polish #6198 : [Issue] Fixing NacosDynamicConfiguration#publishConfig bug

* Polish #6315 : [Refactor] Refactoring the implementation of MetadataReport based on The Config-Center infrastructure

Deprecated List :

- NacosMetadataReport
- ZookeeperMetadataReport

* Polish #6315 : Refactoring by TreePathDynamicConfiguration

* Polish #6315 : Refactoring ConsulDynamicConfiguration by TreePathDynamicConfiguration

* Polish #6315 : Reset the config base path to be "metadata" for ConfigCenterBasedMetadataReportFactory

* Polish #6315 : Bugfix

* Polish #6315 : Bugfix

* Polish #6315 : Correct words

* sync wait netty server to finish shutdown (#6281)

* Polish #6333 : [Refactor] Using mandatory implementation of Service Instance registration instead of the event

* maybe we can remove null judge in this case (#6321)

* update

* update

* Polish #6336 : [Refactor] org.apache.dubbo.metadata.ServiceNameMapping

* Polish #6170 : [Feature] Introducing the externalized configuration for ServiceNameMapping

* Polish #6342 : [Enhancement] Introducing the composite ServiceNameMapping

* Refactor

* fix method name typo in JValidator.java (#6344)

* [Dubbo-6340]fix application cannot exit when use consul registry (#6341)

* fix application cannot exit when use consul registry

* make consul registry suppor ACL (#6313)

* make consul registry suppor ACL

* Polish #6172 : [Feature] Adding the "services" attribute methods into @DubboReference

* Polish #6173 : [Feature] Adding the "services" attribute into <dubbo:reference> element

* Polish #6346 : [Issue] Merging all subscribied URLs from the multiple services

* Polish #6346 : [Issue] Merging all subscribied URLs from the multiple services

* fix publish null value when use consul config center (#6351)

* fix publish null value when use consul config center

* Polish #6252

* Polish #6356 & #6171

* Polish #6356 & #6171

* Polish #6224 : Filter chain was not invoked with local calls since v2.7.6

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : Adding META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataServiceExporter

* fix the priority of ListenableRouter were not effective (#6148)

fixes #4822

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* when the url is generic, the log level should be info (#6363)

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* fix NPE when check=false is set and provider is empty. (#6376)

fixes #6228

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* fix #6306.  support TypeBuilder sort (#6365)

* fix #6306. support TypeBuilder sort

* fix #6306. support TypeBuilder sort

* fix #6306. support TypeBuilder sort

* remove unused import

* add license for test file

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* enhance ClusterInvoker & ExtensionLoader (#6343)

- Introduce ClusterInvoker to better support multiple registries subscription
- Wrapper sort and enable/disable
- some small fixes

* Polish #6322 : [Enhancement] Fix the issues of test-cases after refactoring

* Fixed the test-cases

Co-authored-by: tswstarplanet <tswstarplanet@apache.org>
Co-authored-by: Nine <nine.yang.coding@gmail.com>
Co-authored-by: 陈哈哈 <chenyongjia365@outlook.com>
Co-authored-by: luoning810 <18311333766@163.com>
Co-authored-by: cvictory <shenglicao2@gmail.com>
Co-authored-by: ken.lj <ken.lj.hz@gmail.com>
  • Loading branch information
7 people authored Jun 30, 2020
1 parent 11b2f35 commit 88bd09f
Show file tree
Hide file tree
Showing 79 changed files with 2,970 additions and 1,568 deletions.
8 changes: 8 additions & 0 deletions dubbo-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,14 @@
</resource>
</transformer>

<!-- @since 2.7.8 -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataServiceExporter
</resource>
</transformer>

</transformers>
<filters>
<filter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.support.FailoverCluster;
Expand All @@ -29,8 +31,9 @@
* <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a>
*
*/
@SPI(FailoverCluster.NAME)
@SPI(Cluster.DEFAULT)
public interface Cluster {
String DEFAULT = FailoverCluster.NAME;

/**
* Merge the directory invokers to a virtual invoker.
Expand All @@ -43,4 +46,14 @@ public interface Cluster {
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;

static Cluster getCluster(String name) {
return getCluster(name, true);
}

static Cluster getCluster(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = Cluster.DEFAULT;
}
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invoker;

/**
* This is the final Invoker type referenced by the RPC proxy on Consumer side.
* <p>
* A ClusterInvoker holds a group of normal invokers, stored in a Directory, mapping to one Registry.
* The ClusterInvoker implementation usually provides LB or HA policies, like FailoverClusterInvoker.
* <p>
* In multi-registry subscription scenario, the final ClusterInvoker will referr to several sub ClusterInvokers, with each
* sub ClusterInvoker representing one Registry. Take ZoneAwareClusterInvoker as an example, it is specially customized for
* multi-registry use cases: first, pick up one ClusterInvoker, then do LB inside the chose ClusterInvoker.
*
* @param <T>
*/
public interface ClusterInvoker<T> extends Invoker<T> {
URL getRegistryUrl();

Directory<T> getDirectory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain) {
}

this.url = url.removeParameter(REFER_KEY).removeParameter(MONITOR_KEY);
this.consumerUrl = url.addParameters(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)))
.removeParameter(MONITOR_KEY);
this.consumerUrl = this.url.addParameters(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)));

setRouterChain(routerChain);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation

@Override
public int getPriority() {
return DEFAULT_PRIORITY;
return priority;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class MockInvokersSelector extends AbstractRouter {

public static final String NAME = "MOCK_ROUTER";
private static final int MOCK_INVOKERS_DEFAULT_PRIORITY = Integer.MIN_VALUE;
private static final int MOCK_INVOKERS_DEFAULT_PRIORITY = -100;

public MockInvokersSelector() {
this.priority = MOCK_INVOKERS_DEFAULT_PRIORITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.support.RpcUtils;
Expand All @@ -49,7 +50,7 @@
/**
* AbstractClusterInvoker
*/
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
Expand Down Expand Up @@ -59,24 +60,23 @@ public ZoneAwareClusterInvoker(Directory<T> directory) {
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key.
for (Invoker<T> invoker : invokers) {
// FIXME, the invoker is a cluster invoker representing one Registry, so it will automatically wrapped by MockClusterInvoker.
MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker<T>) invoker;
if (mockClusterInvoker.isAvailable() && mockClusterInvoker.getRegistryUrl()
ClusterInvoker<T> clusterInvoker = (ClusterInvoker<T>) invoker;
if (clusterInvoker.isAvailable() && clusterInvoker.getRegistryUrl()
.getParameter(REGISTRY_KEY + "." + PREFERRED_KEY, false)) {
return mockClusterInvoker.invoke(invocation);
return clusterInvoker.invoke(invocation);
}
}

// providers in the registry with the same zone
String zone = (String) invocation.getAttachment(REGISTRY_ZONE);
String zone = invocation.getAttachment(REGISTRY_ZONE);
if (StringUtils.isNotEmpty(zone)) {
for (Invoker<T> invoker : invokers) {
MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker<T>) invoker;
if (mockClusterInvoker.isAvailable() && zone.equals(mockClusterInvoker.getRegistryUrl().getParameter(REGISTRY_KEY + "." + ZONE_KEY))) {
return mockClusterInvoker.invoke(invocation);
ClusterInvoker<T> clusterInvoker = (ClusterInvoker<T>) invoker;
if (clusterInvoker.isAvailable() && zone.equals(clusterInvoker.getRegistryUrl().getParameter(REGISTRY_KEY + "." + ZONE_KEY))) {
return clusterInvoker.invoke(invocation);
}
}
String force = (String) invocation.getAttachment(REGISTRY_ZONE_FORCE);
String force = invocation.getAttachment(REGISTRY_ZONE_FORCE);
if (StringUtils.isNotEmpty(force) && "true".equalsIgnoreCase(force)) {
throw new IllegalStateException("No registry instance in zone or no available providers in the registry, zone: "
+ zone
Expand All @@ -93,12 +93,14 @@ public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, L

// If none of the invokers has a preferred signal or is picked by the loadbalancer, pick the first one available.
for (Invoker<T> invoker : invokers) {
MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker<T>) invoker;
if (mockClusterInvoker.isAvailable()) {
return mockClusterInvoker.invoke(invocation);
ClusterInvoker<T> clusterInvoker = (ClusterInvoker<T>) invoker;
if (clusterInvoker.isAvailable()) {
return clusterInvoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);

//if none available,just pick one
return invokers.get(0).invoke(invocation);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.support.MockInvoker;

Expand All @@ -35,7 +36,7 @@
import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.INVOCATION_NEED_MOCK;

public class MockClusterInvoker<T> implements Invoker<T> {
public class MockClusterInvoker<T> implements ClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class);

Expand All @@ -57,6 +58,11 @@ public URL getRegistryUrl() {
return directory.getUrl();
}

@Override
public Directory<T> getDirectory() {
return directory;
}

@Override
public boolean isAvailable() {
return directory.isAvailable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testSelect_multiInvokers() throws Exception {
public void testCloseAvailablecheck() {
LoadBalance lb = mock(LoadBalance.class);
Map<String, String> queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
URL tmpUrl = url.addParameters(queryMap).removeParameter(MONITOR_KEY);
URL tmpUrl = url.addParameters(queryMap).removeParameter(REFER_KEY).removeParameter(MONITOR_KEY);
given(lb.select(invokers, tmpUrl, invocation)).willReturn(invoker1);
initlistsize5();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ protected ThreadPoolExecutor initWorkersThreadPool(String threadPoolPrefixName,
int threadPoolSize,
long keepAliveTime) {
return new ThreadPoolExecutor(threadPoolSize, threadPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(threadPoolPrefixName));
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(threadPoolPrefixName, true));
}

protected static String getThreadPoolPrefixName(URL url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration;
import org.apache.dubbo.common.function.ThrowableConsumer;
import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.lang.ShutdownHookCallbacks;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;

Expand Down Expand Up @@ -146,6 +147,7 @@ public class FileSystemDynamicConfiguration extends TreePathDynamicConfiguration
MODIFIERS = initWatchEventModifiers();
DELAY = initDelay(MODIFIERS);
WATCH_EVENTS_LOOP_THREAD_POOL = newWatchEventsLoopThreadPool();
registerDubboShutdownHook();
}

/**
Expand Down Expand Up @@ -230,6 +232,24 @@ private void doInListener(String configFilePath, BiConsumer<File, List<Configura
});
}

/**
* Register the Dubbo ShutdownHook
*
* @since 2.7.8
*/
private static void registerDubboShutdownHook() {
ShutdownHookCallbacks.INSTANCE.addCallback(() -> {
watchService.ifPresent(w -> {
try {
w.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
getWatchEventsLoopThreadPool().shutdown();
});
}

private static boolean isProcessingWatchEvents() {
return getWatchEventsLoopThreadPool().getActiveCount() > 0;
}
Expand Down
Loading

0 comments on commit 88bd09f

Please sign in to comment.