Open
Description
nacos-sync默认是Service级别的服务同步,需要在控制台配置ServiceName和GroupName才可以进行服务同步。
自pr304之后也支持基于Ns-Group级别的服务同步。
通过将ServiceName配置为”ALL“以实现Group级别的同步。
这个pr给了我很大的帮助,我在这个基础上做了一些拓展,让nacos-sync支持Cluster级别的服务同步,只需要在新建同步任务的时候将ServiceName和GroupName均配置为”ALL“即可。
主要改动点在com.alibaba.nacossync.timer.CheckRunningStatusAllThread#run
改动前逻辑:根据GroupName查询服务列表,然后进行同步
改动后逻辑:判断GroupName是否为ALL,如果为false,逻辑和之前一致,根据GroupName查询服务列表,然后进行同步;
如果为true,通过增强后的NacosEnhanceNamingService查询集群下所有服务列表,然后进行同步。
对原有功能无影响。
public void run() {
Long startTime = System.currentTimeMillis();
try {
// 查找所有ServiceName为”ALL“的任务
List<TaskDO> taskDOS = taskAccessService.findServiceNameIsNull()
.stream().filter(t -> t.getStatus() == null || t.getStatus() == 0)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskDOS)) {
return;
}
for (TaskDO taskDO : taskDOS) {
// 改动点:根据GroupName查询Group下所有服务 =》 如果GroupName为”ALL“,查询集群下所有服务,根据GroupName进行分组;如果不为”ALL“,则逻辑不变。
Map<String, List<String>> serviceNameListGroupByGroupName = getServiceNameListGroupByGroupName(taskDO);
if (serviceNameListGroupByGroupName.isEmpty()) {
continue;
}
for (Map.Entry<String, List<String>> entry : serviceNameListGroupByGroupName.entrySet()) {
taskDO.setGroupName(entry.getKey());
List<String> serviceNameList = entry.getValue();
//如果是null,证明此时没有处理完成
List<String> filterService = serviceNameList.stream()
.filter(serviceName -> skyWalkerCacheServices.getFinishedTask(taskDO.getTaskId() + serviceName ) == null)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(filterService)) {
continue;
}
// 当删除任务后,此时任务的状态为DELETE,不会执行数据同步
if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) {
fastSyncHelper.syncWithThread(taskDO, filterService);
}
}
}
}catch (Exception e) {
log.warn("CheckRunningStatusThread Exception ", e);
}
metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - startTime);
}
private Map<String, List<String>> getServiceNameListGroupByGroupName(TaskDO taskDO) throws NacosException {
Map<String, List<String>> map = new HashMap<>();
NamingService namingService = nacosServerHolder.get(taskDO.getSourceClusterId());
if (SkyWalkerConstants.ALL.equals(taskDO.getGroupName())) {
// 如果serviceName和GroupName都是ALL,查询集群下所有服务
NacosEnhanceNamingService enhanceNamingService = new NacosEnhanceNamingService(namingService);
CatalogServiceResult catalogServiceResult = enhanceNamingService.catalogServices(null, null);
if (catalogServiceResult == null || catalogServiceResult.getCount() <= 0) {
return map;
}
List<ServiceView> serviceList = catalogServiceResult.getServiceList();
return serviceList.stream()
.collect(Collectors.groupingBy(ServiceView::getGroupName, Collectors.mapping(ServiceView::getName, Collectors.toList())));
}
// GroupName不为ALL,查询该Group下的所有服务
ListView<String> servicesOfServer = namingService.getServicesOfServer(0, Integer.MAX_VALUE,
taskDO.getGroupName());
map.put(taskDO.getGroupName(), servicesOfServer.getData());
return map;
}
Metadata
Metadata
Assignees
Labels
No labels