Skip to content

Commit a68377e

Browse files
authored
[IOTDB-4140] Mark the datanode as removing status when execute remove-datanode.sh (apache#7008)
1 parent 6c4a87c commit a68377e

File tree

21 files changed

+176
-98
lines changed

21 files changed

+176
-98
lines changed

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public class ConfigNodeConfig {
127127
private long heartbeatInterval = 1000;
128128

129129
/** The routing policy of read/write requests */
130-
private String routingPolicy = RouteBalancer.leaderPolicy;
130+
private String routingPolicy = RouteBalancer.LEADER_POLICY;
131131

132132
private String readConsistencyLevel = "strong";
133133

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public class ConfigNodeConstant {
3838

3939
public static final int MIN_SUPPORTED_JDK_VERSION = 8;
4040

41+
/** These variables are only used for cluster gauge metrics */
42+
public static final String METRIC_TAG_TOTAL = "total";
43+
44+
public static final String METRIC_STATUS_REGISTER = "Registered";
45+
public static final String METRIC_STATUS_ONLINE = "Online";
46+
public static final String METRIC_STATUS_UNKNOWN = "Unknown";
47+
4148
private ConfigNodeConstant() {
4249
// empty constructor
4350
}

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ private void loadProps() {
223223
"heartbeat_interval", String.valueOf(conf.getHeartbeatInterval()))));
224224

225225
String routingPolicy = properties.getProperty("routing_policy", conf.getRoutingPolicy());
226-
if (routingPolicy.equals(RouteBalancer.greedyPolicy)
227-
|| routingPolicy.equals(RouteBalancer.leaderPolicy)) {
226+
if (routingPolicy.equals(RouteBalancer.GREEDY_POLICY)
227+
|| routingPolicy.equals(RouteBalancer.LEADER_POLICY)) {
228228
conf.setRoutingPolicy(routingPolicy);
229229
} else {
230230
throw new IOException(

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private void checkGlobalConfig() throws ConfigurationException {
9292
"%s or %s", ConsensusFactory.StandAloneConsensus, ConsensusFactory.RatisConsensus));
9393
}
9494

95-
if (!conf.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)
96-
&& !conf.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)) {
95+
if (!conf.getRoutingPolicy().equals(RouteBalancer.LEADER_POLICY)
96+
&& !conf.getRoutingPolicy().equals(RouteBalancer.GREEDY_POLICY)) {
9797
throw new ConfigurationException(
9898
"routing_policy", conf.getRoutingPolicy(), "leader or greedy");
9999
}

confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@
4545
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
4646
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
4747
import org.apache.iotdb.confignode.manager.load.LoadManager;
48+
import org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache;
4849
import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
4950
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
50-
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
5151
import org.apache.iotdb.confignode.persistence.NodeInfo;
5252
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
5353
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
@@ -63,6 +63,7 @@
6363
import org.slf4j.LoggerFactory;
6464

6565
import java.util.ArrayList;
66+
import java.util.Arrays;
6667
import java.util.Collections;
6768
import java.util.Comparator;
6869
import java.util.List;
@@ -95,7 +96,7 @@ public class NodeManager {
9596
// Monitor for leadership change
9697
private final Object scheduleMonitor = new Object();
9798
// Map<NodeId, INodeCache>
98-
private final Map<Integer, INodeCache> nodeCacheMap;
99+
private final Map<Integer, BaseNodeCache> nodeCacheMap;
99100
private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
100101
private Future<?> currentHeartbeatFuture;
101102
private final ScheduledExecutorService heartBeatExecutor =
@@ -520,7 +521,7 @@ public void stopHeartbeatService() {
520521
}
521522
}
522523

523-
public Map<Integer, INodeCache> getNodeCacheMap() {
524+
public Map<Integer, BaseNodeCache> getNodeCacheMap() {
524525
return nodeCacheMap;
525526
}
526527

@@ -531,7 +532,7 @@ public Map<Integer, INodeCache> getNodeCacheMap() {
531532
* @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
532533
*/
533534
private String getNodeStatus(int nodeId) {
534-
INodeCache nodeCache = nodeCacheMap.get(nodeId);
535+
BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
535536
return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus();
536537
}
537538

@@ -558,13 +559,14 @@ public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus status
558559
* @param status The specific NodeStatus
559560
* @return Filtered DataNodes with the specific NodeStatus
560561
*/
561-
public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus status) {
562+
public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
562563
return getRegisteredDataNodes().stream()
563564
.filter(
564565
registeredDataNode -> {
565566
int id = registeredDataNode.getLocation().getDataNodeId();
566567
return nodeCacheMap.containsKey(id)
567-
&& status.equals(nodeCacheMap.get(id).getNodeStatus());
568+
&& Arrays.stream(status)
569+
.anyMatch(s -> s.equals(nodeCacheMap.get(id).getNodeStatus()));
568570
})
569571
.collect(Collectors.toList());
570572
}
@@ -583,6 +585,23 @@ public Map<Integer, Long> getAllLoadScores() {
583585
return result;
584586
}
585587

588+
public boolean isNodeRemoving(int dataNodeId) {
589+
DataNodeHeartbeatCache cache =
590+
(DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
591+
if (cache != null) {
592+
return cache.isRemoving();
593+
}
594+
return false;
595+
}
596+
597+
public void setNodeRemovingStatus(int dataNodeId, boolean isRemoving) {
598+
DataNodeHeartbeatCache cache =
599+
(DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
600+
if (cache != null) {
601+
cache.setRemoving(isRemoving);
602+
}
603+
}
604+
586605
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
587606
return nodeInfo.getRegisteredConfigNodes();
588607
}

confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,8 @@ public Map<TConsensusGroupId, IRegionGroupCache> getRegionGroupCacheMap() {
535535
}
536536

537537
/**
538-
* Get the leadership of each RegionGroup
538+
* Get the leadership of each RegionGroup If a node is in unknown or removing status, this node
539+
* can't be leader
539540
*
540541
* @return Map<RegionGroupId, leader location>
541542
*/
@@ -548,7 +549,12 @@ public Map<TConsensusGroupId, Integer> getAllLeadership() {
548549
regionGroupCacheMap.forEach(
549550
(consensusGroupId, regionGroupCache) -> {
550551
if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) {
551-
result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId());
552+
int leaderDataNodeId = regionGroupCache.getLeaderDataNodeId();
553+
if (configManager.getNodeManager().isNodeRemoving(leaderDataNodeId)) {
554+
result.put(consensusGroupId, -1);
555+
} else {
556+
result.put(consensusGroupId, leaderDataNodeId);
557+
}
552558
}
553559
});
554560
getLoadManager()
@@ -561,8 +567,14 @@ public Map<TConsensusGroupId, Integer> getAllLeadership() {
561567
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId()));
562568
} else {
563569
regionGroupCacheMap.forEach(
564-
(consensusGroupId, regionGroupCache) ->
565-
result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId()));
570+
(consensusGroupId, regionGroupCache) -> {
571+
int leaderDataNodeId = regionGroupCache.getLeaderDataNodeId();
572+
if (configManager.getNodeManager().isNodeRemoving(leaderDataNodeId)) {
573+
result.put(consensusGroupId, -1);
574+
} else {
575+
result.put(consensusGroupId, leaderDataNodeId);
576+
}
577+
});
566578
}
567579
return result;
568580
}

confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ private void updateNodeLoadStatistic() {
215215
isNeedBroadcast = true;
216216
}
217217

218-
if (RouteBalancer.leaderPolicy.equals(CONF.getRoutingPolicy())) {
218+
if (RouteBalancer.LEADER_POLICY.equals(CONF.getRoutingPolicy())) {
219219
// Check the condition of leader routing policy
220220
if (existChangeLeaderSchemaRegionGroup.get()) {
221221
// Broadcast the RegionRouteMap if some SchemaRegionGroups change their leader

confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
import java.util.Map;
3737
import java.util.concurrent.ConcurrentHashMap;
3838

39+
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_ONLINE;
40+
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_UNKNOWN;
41+
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_TAG_TOTAL;
42+
3943
/** This class collates metrics about loadManager */
4044
public class LoadManagerMetrics {
4145

@@ -148,9 +152,9 @@ public void addNodeMetrics() {
148152
this,
149153
o -> getRunningConfigNodesNum(),
150154
Tag.NAME.toString(),
151-
"total",
155+
METRIC_TAG_TOTAL,
152156
Tag.STATUS.toString(),
153-
NodeStatus.Online.toString());
157+
METRIC_STATUS_ONLINE);
154158

155159
MetricService.getInstance()
156160
.getOrCreateAutoGauge(
@@ -159,9 +163,9 @@ public void addNodeMetrics() {
159163
this,
160164
o -> getRunningDataNodesNum(),
161165
Tag.NAME.toString(),
162-
"total",
166+
METRIC_TAG_TOTAL,
163167
Tag.STATUS.toString(),
164-
NodeStatus.Online.toString());
168+
METRIC_STATUS_ONLINE);
165169

166170
MetricService.getInstance()
167171
.getOrCreateAutoGauge(
@@ -170,9 +174,9 @@ public void addNodeMetrics() {
170174
this,
171175
o -> getUnknownConfigNodesNum(),
172176
Tag.NAME.toString(),
173-
"total",
177+
METRIC_TAG_TOTAL,
174178
Tag.STATUS.toString(),
175-
NodeStatus.Unknown.toString());
179+
METRIC_STATUS_UNKNOWN);
176180

177181
MetricService.getInstance()
178182
.getOrCreateAutoGauge(
@@ -181,9 +185,9 @@ public void addNodeMetrics() {
181185
this,
182186
o -> getUnknownDataNodesNum(),
183187
Tag.NAME.toString(),
184-
"total",
188+
METRIC_TAG_TOTAL,
185189
Tag.STATUS.toString(),
186-
NodeStatus.Unknown.toString());
190+
METRIC_STATUS_UNKNOWN);
187191
}
188192

189193
/**
@@ -227,33 +231,33 @@ public void removeMetrics() {
227231
MetricType.GAUGE,
228232
Metric.CONFIG_NODE.toString(),
229233
Tag.NAME.toString(),
230-
"total",
234+
METRIC_TAG_TOTAL,
231235
Tag.STATUS.toString(),
232-
NodeStatus.Online.toString());
236+
METRIC_STATUS_ONLINE);
233237
MetricService.getInstance()
234238
.remove(
235239
MetricType.GAUGE,
236240
Metric.DATA_NODE.toString(),
237241
Tag.NAME.toString(),
238-
"total",
242+
METRIC_TAG_TOTAL,
239243
Tag.STATUS.toString(),
240-
NodeStatus.Online.toString());
244+
METRIC_STATUS_ONLINE);
241245
MetricService.getInstance()
242246
.remove(
243247
MetricType.GAUGE,
244248
Metric.CONFIG_NODE.toString(),
245249
Tag.NAME.toString(),
246-
"total",
250+
METRIC_TAG_TOTAL,
247251
Tag.STATUS.toString(),
248-
NodeStatus.Unknown.toString());
252+
METRIC_STATUS_UNKNOWN);
249253
MetricService.getInstance()
250254
.remove(
251255
MetricType.GAUGE,
252256
Metric.DATA_NODE.toString(),
253257
Tag.NAME.toString(),
254-
"total",
258+
METRIC_TAG_TOTAL,
255259
Tag.STATUS.toString(),
256-
NodeStatus.Unknown.toString());
260+
METRIC_STATUS_UNKNOWN);
257261
}
258262

259263
private NodeManager getNodeManager() {

confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242
*/
4343
public class RouteBalancer {
4444

45-
public static final String leaderPolicy = "leader";
46-
public static final String greedyPolicy = "greedy";
45+
public static final String LEADER_POLICY = "leader";
46+
public static final String GREEDY_POLICY = "greedy";
4747

4848
private final IManager configManager;
4949

@@ -84,7 +84,7 @@ private IRouter genRouter(TConsensusGroupType groupType) {
8484
String policy = ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy();
8585
switch (groupType) {
8686
case SchemaRegion:
87-
if (policy.equals(leaderPolicy)) {
87+
if (LEADER_POLICY.equals(policy)) {
8888
return new LeaderRouter(
8989
getPartitionManager().getAllLeadership(), getNodeManager().getAllLoadScores());
9090
} else {
@@ -98,9 +98,10 @@ private IRouter genRouter(TConsensusGroupType groupType) {
9898
.equals(ConsensusFactory.MultiLeaderConsensus)) {
9999
// Latent router for MultiLeader consensus protocol
100100
lazyGreedyRouter.updateUnknownDataNodes(
101-
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Unknown));
101+
getNodeManager()
102+
.filterDataNodeThroughStatus(NodeStatus.Unknown, NodeStatus.Removing));
102103
return lazyGreedyRouter;
103-
} else if (policy.equals(leaderPolicy)) {
104+
} else if (LEADER_POLICY.equals(policy)) {
104105
return new LeaderRouter(
105106
getPartitionManager().getAllLeadership(), getNodeManager().getAllLoadScores());
106107
} else {

confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@
4040
*/
4141
public class LazyGreedyRouter implements IRouter {
4242

43-
// Set<DataNodeId>
43+
/** Set<DataNodeId> which stores the unknown and removing datanodes */
4444
private final Set<Integer> unknownDataNodes;
45+
4546
private final Map<TConsensusGroupId, TRegionReplicaSet> routeMap;
4647

4748
public LazyGreedyRouter() {

0 commit comments

Comments
 (0)