Skip to content

Commit

Permalink
bugfix: fix the naming server node having a term of 0 (#6778)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggbocoder authored Aug 25, 2024
1 parent 64035c5 commit eba3e34
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 23 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] fix support serialization for dm.jdbc.driver.DmdbTimestamp
- [[#6757](https://github.com/apache/incubator-seata/pull/6757)] the bug where multiple nodes cannot be retrieved from the naming server
- [[#6769](https://github.com/apache/incubator-seata/pull/6769)] fix tcc fence deadLock
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term


### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] 修复达梦数据库的对dm.jdbc.driver.DmdbTimestamp的支持
- [[#6757](https://github.com/apache/incubator-seata/pull/6757)] 修复client通过namingserver只能获取到一个tc节点的bug
- [[#6769](https://github.com/apache/incubator-seata/pull/6769)] 修复tcc fence死锁
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void doRegister(Instance instance, List<String> urlList) {
}

public boolean doHealthCheck(String url) {
url = HTTP_PREFIX + url + "/health";
url = HTTP_PREFIX + url + "/naming/v1/health";
Map<String, String> header = new HashMap<>();
header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
try (CloseableHttpResponse response = HttpClientUtil.doGet(url, null, header, 3000)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.seata.common.result.Result;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/naming/v1")
public class HealthController {

@GetMapping("/health")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@ public Result<String> changeGroup(@RequestParam String namespace,
public void watch(@RequestParam String clientTerm,
@RequestParam String vGroup,
@RequestParam String timeout,
@RequestParam String clientAddr,
HttpServletRequest request) {
AsyncContext context = request.startAsync();
context.setTimeout(0L);
Watcher<AsyncContext> watcher = new Watcher<>(vGroup, context, Integer.parseInt(timeout), Long.parseLong(clientTerm), clientAddr);
Watcher<AsyncContext> watcher = new Watcher<>(vGroup, context, Integer.parseInt(timeout), Long.parseLong(clientTerm), request.getRemoteAddr());
clusterWatcherManager.registryWatcher(watcher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,13 @@ public void addGroup(String namespace, String clusterName, String unitName, Stri
}
}

public void notifyClusterChange(String namespace, String clusterName, String unitName, long term) {
vGroupMap.asMap().forEach((vGroup, namespaceMap) -> {
Optional.ofNullable(namespaceMap.get(namespace))
.flatMap(namespaceBO -> Optional.ofNullable(namespaceBO.getCluster(clusterName)))
.ifPresent(clusterBO -> {
Set<String> units = clusterBO.getUnitNames();
if (StringUtils.isBlank(unitName) || units.contains(unitName)) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term));
}
});
public void notifyClusterChange(String vGroup, String namespace, String clusterName, String unitName, long term) {

Optional.ofNullable(vGroupMap.asMap().get(vGroup)).flatMap(map -> Optional.ofNullable(map.get(namespace)).flatMap(namespaceBO -> Optional.ofNullable(namespaceBO.getCluster(clusterName)))).ifPresent(clusterBO -> {
Set<String> units = clusterBO.getUnitNames();
if (StringUtils.isBlank(unitName) || units.contains(unitName)) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term));
}
});
}

Expand All @@ -250,7 +247,7 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
// create cluster when there is no cluster in clusterDataHashMap
ClusterData clusterData = clusterDataHashMap.computeIfAbsent(clusterName,
key -> new ClusterData(clusterName, (String)node.getMetadata().get("cluster-type")));

boolean hasChanged = clusterData.registerInstance(node, unitName);
// if extended metadata includes vgroup mapping relationship, add it in clusterData
Optional.ofNullable(node.getMetadata().get(CONSTANT_GROUP)).ifPresent(mappingObj -> {
if (mappingObj instanceof Map) {
Expand All @@ -259,13 +256,12 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
// In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node.
// In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used.
addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k);
if (hasChanged) {
notifyClusterChange(k,namespace, clusterName, unitName,node.getTerm());
}
});
}
});
boolean hasChanged = clusterData.registerInstance(node, unitName);
if (hasChanged) {
notifyClusterChange(namespace, clusterName, unitName,node.getTerm());
}
instanceLiveTable.put(
new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()),
System.currentTimeMillis());
Expand All @@ -285,10 +281,13 @@ public boolean unregisterInstance(String namespace, String clusterName, String u
clusterData.removeInstance(node, unitName);
Object vgroupMap = node.getMetadata().get(CONSTANT_GROUP);
if (vgroupMap instanceof Map) {
((Map<String, Object>) vgroupMap).forEach((group, realUnitName) -> vGroupMap.get(group, k -> new ConcurrentHashMap<>())
.get(namespace).getCluster(clusterName).remove(realUnitName == null ? unitName : (String) realUnitName));
((Map<String, Object>) vgroupMap).forEach((group, realUnitName) -> {
vGroupMap.get(group, k -> new ConcurrentHashMap<>())
.get(namespace).getCluster(clusterName).remove(realUnitName == null ? unitName : (String) realUnitName);
notifyClusterChange(group, namespace, clusterName, unitName, node.getTerm());
});
}
notifyClusterChange(namespace, clusterName, unitName, node.getTerm());

instanceLiveTable.remove(
new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()));
}
Expand Down Expand Up @@ -353,14 +352,14 @@ public void instanceHeartBeatCheck() {
Set<String> units = clusterBO.getUnitNames();
if (units != null) {
units.remove(unitName == null ? instance.getUnit() : unitName);
notifyClusterChange(group,namespace, clusterData.getClusterName(), unit.getUnitName(),-1);
}
});
}

LOGGER.warn("{} instance has gone offline",
instance.getTransaction().getHost() + ":" + instance.getTransaction().getPort());
}
notifyClusterChange(namespace, clusterData.getClusterName(), unit.getUnitName(),-1);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion script/server/db/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ CREATE TABLE IF NOT EXISTS `vgroup_table`
`vGroup` VARCHAR(255),
`namespace` VARCHAR(255),
`cluster` VARCHAR(255),
primary key (`vGroup`)
UNIQUE KEY `idx_vgroup_namespace_cluster` (`vGroup`,`namespace`,`cluster`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
2 changes: 2 additions & 0 deletions server/src/main/java/org/apache/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public static void metadataInit() {
// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

Expand Down

0 comments on commit eba3e34

Please sign in to comment.