Skip to content

Commit

Permalink
bugfix: change group and node offline status are not pushed in real t…
Browse files Browse the repository at this point in the history
…ime (#6812)
  • Loading branch information
ggbocoder authored Sep 4, 2024
1 parent 4b419a4 commit 242d92c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 13 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty
- [[#6800](https://github.com/apache/incubator-seata/pull/6800)] make exception message generic for all database drivers
- [[#6759](https://github.com/apache/incubator-seata/pull/6759)] fix the error of active refresh failure of cross-database table metadata
- [[#6812](https://github.com/apache/incubator-seata/pull/6812)] bugfix: change group and node offline status are not pushed in real time


### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址
- [[#6800](https://github.com/apache/incubator-seata/pull/6800)] 使异常消息对所有数据库驱动程序通用
- - [[#6812](https://github.com/apache/incubator-seata/pull/6812)] 修复切换事务分组和节点下线时namingserver没有实时感知和推送的bug

- [[#6759](https://github.com/apache/incubator-seata/pull/6759)] 修复跨库表主动刷新`tableMeta`的异常问题

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ public List<InetSocketAddress> refreshGroup(String vGroup) throws IOException {
throw new NamingRegistryException("cannot lookup server list in vgroup: " + vGroup);
}
String jsonResponse = EntityUtils.toString(response.getEntity(), "UTF-8");
response.close();
// jsonResponse -> MetaResponse
MetaResponse metaResponse = OBJECT_MAPPER.readValue(jsonResponse, new TypeReference<MetaResponse>() {
});
Expand All @@ -375,6 +374,7 @@ public List<InetSocketAddress> refreshGroup(String vGroup) throws IOException {
term = metaResponse.getTerm();
}
VGROUP_ADDRESS_MAP.put(vGroup, newAddressList);
removeOfflineAddressesIfNecessary(vGroup,vGroup,newAddressList);
} catch (IOException e) {
LOGGER.error(e.getMessage());
throw new RemoteException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ public void addGroup(String namespace, String clusterName, String unitName, Stri
public void notifyClusterChange(String vGroup, String namespace, String clusterName, String unitName, long term) {

Optional.ofNullable(vGroupMap.asMap().get(vGroup)).flatMap(map -> Optional.ofNullable(map.get(namespace)).flatMap(namespaceBO -> Optional.ofNullable(namespaceBO.getCluster(clusterName)))).ifPresent(clusterBO -> {
Set<String> units = clusterBO.getUnitNames();
if (StringUtils.isBlank(unitName) || units.contains(unitName)) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term));
}
// Set<String> units = clusterBO.getUnitNames();
// if (!CollectionUtils.isEmpty(units)) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term));
// }
});
}

Expand Down Expand Up @@ -276,7 +276,7 @@ public boolean unregisterInstance(String namespace, String clusterName, String u
if (vgroupMap instanceof Map) {
((Map<String, Object>) vgroupMap).forEach((group, realUnitName) -> {
vGroupMap.get(group, k -> new ConcurrentHashMap<>())
.get(namespace).getCluster(clusterName).remove(realUnitName == null ? unitName : (String) realUnitName);
.get(namespace).getCluster(clusterName).remove(realUnitName == null ? unitName : (String) realUnitName);
notifyClusterChange(group, namespace, clusterName, unitName, node.getTerm());
});
}
Expand Down Expand Up @@ -367,17 +367,27 @@ public Result<String> changeGroup(String namespace, String vGroup, String cluste
namespaceClusters.put(currentNamespace,
new HashSet<>(namespaceMap.get(currentNamespace).getClusterMap().keySet()));
}
createGroup(namespace, vGroup, clusterName, unitName);
Result<String> res = createGroup(namespace, vGroup, clusterName, unitName);
if (!res.isSuccess()) {
LOGGER.error("add vgroup failed!" + res.getMessage());
return res;
}
AtomicReference<Result<String>> result = new AtomicReference<>();
namespaceClusters.forEach((oldNamespace, clusters) -> {
for (String cluster : clusters) {
Optional.ofNullable(namespaceClusterDataMap.get(oldNamespace))
.flatMap(map -> Optional.ofNullable(map.get(cluster))).ifPresent(clusterData -> {
if (!CollectionUtils.isEmpty(clusterData.getUnitData())) {
clusterData.getUnitData().forEach((unit, unitData) -> result
.set(removeGroup(unitData, vGroup, cluster, oldNamespace, unitName)));
}
});
.flatMap(map -> Optional.ofNullable(map.get(cluster))).ifPresent(clusterData -> {
if (!CollectionUtils.isEmpty(clusterData.getUnitData())) {
Optional<Map.Entry<String, Unit>> optionalEntry =
clusterData.getUnitData().entrySet().stream().findFirst();
if (optionalEntry.isPresent()) {
String unit = optionalEntry.get().getKey();
Unit unitData = optionalEntry.get().getValue();
result.set(removeGroup(unitData, vGroup, cluster, oldNamespace, unitName));
notifyClusterChange(vGroup, namespace, cluster, unit, -1);
}
}
});
}
});
return Optional.ofNullable(result.get()).orElseGet(() -> new Result<>("200", "change vGroup successfully!"));
Expand Down

0 comments on commit 242d92c

Please sign in to comment.