Skip to content

Commit

Permalink
[BugFix] forget remove compute node from blacklist (StarRocks#38746)
Browse files Browse the repository at this point in the history
Signed-off-by: abc982627271 <liuxuefen@starrocks.com>
  • Loading branch information
abc982627271 authored Jan 17, 2024
1 parent 1152fa6 commit c5270ae
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 51 deletions.
79 changes: 39 additions & 40 deletions fe/fe-core/src/main/java/com/starrocks/qe/SimpleScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import com.starrocks.common.util.NetUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TNetworkAddress;
Expand All @@ -70,7 +69,7 @@ public class SimpleScheduler {
private static final AtomicLong NEXT_BACKEND_HOST_ID = new AtomicLong(0);

//count id for get ComputeNode
private static final ConcurrentMap<Long, Integer> BLOCKLIST_BACKENDS = Maps.newConcurrentMap();
private static final ConcurrentMap<Long, Integer> BLOCKLIST_NODES = Maps.newConcurrentMap();

private static final AtomicBoolean ENABLE_UPDATE_BLOCKLIST_THREAD;
private static final UpdateBlocklistThread UPDATE_BLOCKLIST_THREAD;
Expand All @@ -85,7 +84,7 @@ public class SimpleScheduler {
public static TNetworkAddress getHost(long nodeId,
List<TScanRangeLocation> locations,
ImmutableMap<Long, ComputeNode> computeNodes,
Reference<Long> backendIdRef) {
Reference<Long> nodeIdRef) {

if (locations == null || computeNodes == null) {
return null;
Expand All @@ -95,18 +94,18 @@ public static TNetworkAddress getHost(long nodeId,
ComputeNode node = computeNodes.get(nodeId);

if (node != null && node.isAlive() && !isInBlocklist(nodeId)) {
backendIdRef.setRef(nodeId);
nodeIdRef.setRef(nodeId);
return new TNetworkAddress(node.getHost(), node.getBePort());
} else {
for (TScanRangeLocation location : locations) {
if (location.backend_id == nodeId) {
continue;
}
// choose the first alive backend(in analysis stage, the locations are random)
ComputeNode candidateBackend = computeNodes.get(location.backend_id);
if (candidateBackend != null && candidateBackend.isAlive() && !isInBlocklist(location.backend_id)) {
backendIdRef.setRef(location.backend_id);
return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort());
ComputeNode candidateNode = computeNodes.get(location.backend_id);
if (candidateNode != null && candidateNode.isAlive() && !isInBlocklist(location.backend_id)) {
nodeIdRef.setRef(location.backend_id);
return new TNetworkAddress(candidateNode.getHost(), candidateNode.getBePort());
}
}

Expand All @@ -120,13 +119,13 @@ public static TNetworkAddress getHost(long nodeId,
if (!candidateNodes.isEmpty()) {
// use modulo operation to ensure that the same node is selected for the dead node
ComputeNode candidateNode = candidateNodes.get((int) (nodeId % candidateNodes.size()));
backendIdRef.setRef(candidateNode.getId());
nodeIdRef.setRef(candidateNode.getId());
return new TNetworkAddress(candidateNode.getHost(), candidateNode.getBePort());
}
}
}

// no backend returned
// no backend or compute node returned
return null;
}

Expand Down Expand Up @@ -182,75 +181,75 @@ private static <T extends ComputeNode> T chooseNode(ImmutableList<T> nodes, Atom
return null;
}

public static void addToBlocklist(Long backendID) {
if (backendID == null) {
public static void addToBlocklist(Long nodeID) {
if (nodeID == null) {
return;
}

int tryTime = Config.heartbeat_timeout_second + 1;
BLOCKLIST_BACKENDS.put(backendID, tryTime);
LOG.warn("add black list " + backendID);
BLOCKLIST_NODES.put(nodeID, tryTime);
LOG.warn("add black list " + nodeID);
}

public static boolean isInBlocklist(long backendId) {
return BLOCKLIST_BACKENDS.containsKey(backendId);
return BLOCKLIST_NODES.containsKey(backendId);
}

// The function is used for unit test
@VisibleForTesting
public static boolean removeFromBlocklist(Long backendID) {
if (backendID == null) {
public static boolean removeFromBlocklist(Long nodeID) {
if (nodeID == null) {
return true;
}

return BLOCKLIST_BACKENDS.remove(backendID) != null;
return BLOCKLIST_NODES.remove(nodeID) != null;
}

public static void updateBlocklist() {
SystemInfoService clusterInfoService = GlobalStateMgr.getCurrentSystemInfo();

List<Long> removedBackends = new ArrayList<>();
Map<Long, Integer> retryingBackends = new HashMap<>();
List<Long> removedNodes = new ArrayList<>();
Map<Long, Integer> retryingNodes = new HashMap<>();

for (Map.Entry<Long, Integer> entry : BLOCKLIST_BACKENDS.entrySet()) {
Long backendId = entry.getKey();
for (Map.Entry<Long, Integer> entry : BLOCKLIST_NODES.entrySet()) {
Long nodeId = entry.getKey();

// 1. If the backend is null, means that the backend has been removed.
// 1. If the node is null, means that the node has been removed.
// 2. check the all ports of the backend
// 3. retry Config.heartbeat_timeout_second + 1 times
// If both of the above conditions are met, the backend is removed from the blocklist
Backend backend = clusterInfoService.getBackend(backendId);
if (backend == null) {
removedBackends.add(backendId);
LOG.warn("remove backendID {} from blacklist", backendId);
} else if (clusterInfoService.checkBackendAvailable(backendId)) {
String host = backend.getHost();
// If both of the above conditions are met, the node is removed from the blocklist
ComputeNode node = clusterInfoService.getBackendOrComputeNode(nodeId);
if (node == null) {
removedNodes.add(nodeId);
LOG.warn("remove nodeID {} from blacklist", nodeId);
} else if (clusterInfoService.checkNodeAvailable(node)) {
String host = node.getHost();
List<Integer> ports = new ArrayList<Integer>();
Collections.addAll(ports, backend.getBePort(), backend.getBrpcPort(), backend.getHttpPort());
Collections.addAll(ports, node.getBePort(), node.getBrpcPort(), node.getHttpPort());
if (NetUtils.checkAccessibleForAllPorts(host, ports)) {
removedBackends.add(backendId);
LOG.warn("remove backendID {} from blacklist", backendId);
removedNodes.add(nodeId);
LOG.warn("remove nodeID {} from blacklist", nodeId);
}
} else {
Integer retryTimes = entry.getValue();
retryTimes = retryTimes - 1;
if (retryTimes <= 0) {
removedBackends.add(backendId);
LOG.warn("remove backendID {} from blacklist", backendId);
removedNodes.add(nodeId);
LOG.warn("remove nodeID {} from blacklist", nodeId);
} else {
retryingBackends.put(backendId, retryTimes);
retryingNodes.put(nodeId, retryTimes);
}
}
}

// remove backends.
for (Long backendId : removedBackends) {
BLOCKLIST_BACKENDS.remove(backendId);
for (Long backendId : removedNodes) {
BLOCKLIST_NODES.remove(backendId);
}

// update the retry times.
for (Map.Entry<Long, Integer> entry : retryingBackends.entrySet()) {
BLOCKLIST_BACKENDS.computeIfPresent(entry.getKey(), (k, v) -> entry.getValue());
for (Map.Entry<Long, Integer> entry : retryingNodes.entrySet()) {
BLOCKLIST_NODES.computeIfPresent(entry.getKey(), (k, v) -> entry.getValue());
}
}

Expand Down
22 changes: 11 additions & 11 deletions fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public void testUpdateBlacklist(@Mocked GlobalStateMgr globalStateMgr,
times = 2;

// backend 10001 will be removed
systemInfoService.getBackend(10001L);
systemInfoService.getBackendOrComputeNode(10001L);
result = null;
times = 1;

Expand All @@ -330,11 +330,11 @@ public void testUpdateBlacklist(@Mocked GlobalStateMgr globalStateMgr,
backend1.setHost("host10002");
backend1.setBrpcPort(10002);
backend1.setHttpPort(10012);
systemInfoService.getBackend(10002L);
systemInfoService.getBackendOrComputeNode(10002L);
result = backend1;
times = 1;

systemInfoService.checkBackendAvailable(10002L);
systemInfoService.checkNodeAvailable(backend1);
result = true;
times = 1;

Expand All @@ -343,16 +343,16 @@ public void testUpdateBlacklist(@Mocked GlobalStateMgr globalStateMgr,
times = 1;

// backend 10003, which is not available, will not be be removed
Backend backend2 = new Backend();
backend2.setAlive(false);
backend2.setHost("host10003");
backend2.setBrpcPort(10003);
backend2.setHttpPort(10013);
systemInfoService.getBackend(10003L);
result = backend2;
ComputeNode computeNode1 = new ComputeNode();
computeNode1.setAlive(false);
computeNode1.setHost("host10003");
computeNode1.setBrpcPort(10003);
computeNode1.setHttpPort(10013);
systemInfoService.getBackendOrComputeNode(10003L);
result = computeNode1;
times = 2;

systemInfoService.checkBackendAvailable(10003L);
systemInfoService.checkNodeAvailable(computeNode1);
result = false;
times = 2;
}
Expand Down

0 comments on commit c5270ae

Please sign in to comment.