Skip to content

Commit

Permalink
Polish apache#6346 : [Issue] Merging all subscribied URLs from the mu…
Browse files Browse the repository at this point in the history
…ltiple services
  • Loading branch information
mercyblitz committed Jun 19, 2020
1 parent 7b03172 commit 1b12937
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@PropertySource("classpath:/META-INF/service-introspection/zookeeper-dubbb-consumer.properties")
public class ZookeeperDubboSpringConsumerBootstrap {

@DubboReference(services = "${dubbo.provider.name},${dubbo.provider.name}")
@DubboReference(services = "${dubbo.provider.name},${dubbo.provider.name1},${dubbo.provider.name2}")
private DemoService demoService;

public static void main(String[] args) throws Exception {
Expand All @@ -43,8 +43,11 @@ public static void main(String[] args) throws Exception {

for (int i = 0; i < 100; i++) {
System.out.println(bootstrap.demoService.sayName("Hello"));
Thread.sleep(1000L);
}

System.in.read();

context.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ dubbo.registry.useAsMetadataCenter = true
dubbo.protocol.name = dubbo
dubbo.protocol.port = -1

dubbo.provider.name = zookeeper-dubbo-spring-provider
dubbo.provider.name = zookeeper-dubbo-spring-provider
dubbo.provider.name1 = zookeeper-dubbo-spring-provider-1
dubbo.provider.name2 = zookeeper-dubbo-spring-provider-2
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Dubbo Provider for Zookeeper

dubbo.application.name = zookeeper-dubbo-spring-provider
dubbo.application.name = zookeeper-dubbo-spring-provider-1

dubbo.registry.address = zookeeper://127.0.0.1:2181?registry-type=service
dubbo.registry.useAsConfigCenter = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
* {@link ServiceNameMapping} will help to figure out one or more services that exported correlative Dubbo services. If
* the service names can be found, the exported {@link URL URLs} will be get from the remote {@link MetadataService}
* being deployed on all {@link ServiceInstance instances} of services. The whole process runs under the
* {@link #subscribeURLs(URL, NotifyListener, String, Collection)} method. It's very expensive to invoke
* {@link #subscribeURLs(URL, List, String, Collection)} method. It's very expensive to invoke
* {@link MetadataService} for each {@link ServiceInstance service instance}, thus {@link ServiceDiscoveryRegistry}
* introduces a cache to optimize the calculation with "revisions". If the revisions of N
* {@link ServiceInstance service instances} are same, {@link MetadataService} is invoked just only once, and then it
Expand Down Expand Up @@ -316,24 +316,60 @@ protected void subscribeURLs(URL url, NotifyListener listener) {

Set<String> serviceNames = getServices(url);

serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
List<URL> subscribedURLs = new LinkedList<>();

}
serviceNames.forEach(serviceName -> {

protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
subscribeURLs(url, subscribedURLs, serviceName);

List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
// register ServiceInstancesChangedListener
registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {

subscribeURLs(url, listener, serviceName, serviceInstances);
@Override
public void onEvent(ServiceInstancesChangedEvent event) {
List<URL> subscribedURLs = new LinkedList<>();
Set<String> others = new HashSet<>(serviceNames);
others.remove(serviceName);

// register ServiceInstancesChangedListener
registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
// Collect the subscribedURLs
subscribeURLs(url, subscribedURLs, serviceName, () -> event.getServiceInstances());
subscribeURLs(url, subscribedURLs, others.toString(), () -> getServiceInstances(others));

@Override
public void onEvent(ServiceInstancesChangedEvent event) {
subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
}
// Notify all
listener.notify(subscribedURLs);
}
});
});

// Notify all
listener.notify(subscribedURLs);

}

private List<ServiceInstance> getServiceInstances(Set<String> serviceNames) {
if (isEmpty(serviceNames)) {
return emptyList();
}
List<ServiceInstance> allServiceInstances = new LinkedList<>();
for (String serviceName : serviceNames) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (!isEmpty(serviceInstances)) {
allServiceInstances.addAll(serviceInstances);
}
}
return allServiceInstances;
}

protected void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs,
String serviceName, Supplier<Collection<ServiceInstance>> serviceInstancesSupplier) {
Collection<ServiceInstance> serviceInstances = serviceInstancesSupplier.get();
subscribeURLs(subscribedURL, subscribedURLs, serviceName, serviceInstances);
}


protected void subscribeURLs(URL url, List<URL> subscribedURLs, String serviceName) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
subscribeURLs(url, subscribedURLs, serviceName, serviceInstances);
}

/**
Expand All @@ -360,22 +396,20 @@ private String createListenerId(URL url, ServiceInstancesChangedListener listene
* the instances of {@link SubscribedURLsSynthesizer}
*
* @param subscribedURL the subscribed {@link URL url}
* @param listener {@link NotifyListener}
* @param subscribedURLs {@link NotifyListener}
* @param serviceName
* @param serviceInstances
* @see #getExportedURLs(URL, Collection)
* @see #synthesizeSubscribedURLs(URL, Collection)
*/
protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String serviceName,
protected void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs, String serviceName,
Collection<ServiceInstance> serviceInstances) {

if (isEmpty(serviceInstances)) {
logger.warn(format("There is no instance in service[name : %s]", serviceName));
return;
}

List<URL> subscribedURLs = new LinkedList<>();

/**
* Add the exported URLs from {@link MetadataService}
*/
Expand All @@ -387,8 +421,6 @@ protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String
*/
subscribedURLs.addAll(synthesizeSubscribedURLs(subscribedURL, serviceInstances));
}

listener.notify(subscribedURLs);
}

/**
Expand Down

0 comments on commit 1b12937

Please sign in to comment.