Skip to content

Commit

Permalink
[BugFix] fix shared_data mode query backend (StarRocks#33368) (StarRo…
Browse files Browse the repository at this point in the history
…cks#33371)

Signed-off-by: redscarf <hongweijin1993@gmail.com>
(cherry picked from commit ba3575b)
  • Loading branch information
Jin-H authored and kevincai committed Nov 13, 2023
1 parent c71cd1b commit 1cec56d
Show file tree
Hide file tree
Showing 5 changed files with 488 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
import com.starrocks.http.BaseResponse;
import com.starrocks.http.IllegalArgException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.server.RunMode;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.transaction.TransactionStatus;
import io.netty.handler.codec.http.HttpMethod;
Expand All @@ -70,19 +71,20 @@ public class TransactionLoadAction extends RestBaseAction {
private static final String CHANNEL_ID_STR = "channel_id";
private static TransactionLoadAction ac;

private Map<String, Long> txnBackendMap = new LinkedHashMap<String, Long>(512, 0.75f, true) {
private Map<String, Long> txnNodeMap = new LinkedHashMap<String, Long>(512, 0.75f, true) {
protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
return size() > GlobalStateMgr.getCurrentSystemInfo().getTotalBackendNumber() * 512;
return size() > (GlobalStateMgr.getCurrentSystemInfo().getTotalBackendNumber() +
GlobalStateMgr.getCurrentSystemInfo().getTotalComputeNodeNumber()) * 512;
}
};

public TransactionLoadAction(ActionController controller) {
super(controller);
}

public int txnBackendMapSize() {
public int txnNodeMapSize() {
synchronized (this) {
return txnBackendMap.size();
return txnNodeMap.size();
}
}

Expand Down Expand Up @@ -141,7 +143,7 @@ public void executeTransaction(BaseRequest request, BaseResponse response) throw
throw new DdlException("Must provide channel_num when stream load begin.");
}

Long backendID = null;
Long nodeID = null;

if (Strings.isNullOrEmpty(dbName)) {
throw new UserException("No database selected.");
Expand Down Expand Up @@ -206,15 +208,11 @@ public void executeTransaction(BaseRequest request, BaseResponse response) throw
synchronized (this) {
// 2.1 save label->be map when begin transaction, so that subsequent operator can send to same BE
if (op.equalsIgnoreCase(TXN_BEGIN)) {
List<Long> backendIds = GlobalStateMgr.getCurrentSystemInfo().seqChooseBackendIds(1, true, false);
if (CollectionUtils.isEmpty(backendIds)) {
throw new UserException("No backend alive.");
}
backendID = backendIds.get(0);
// txnBackendMap is LRU cache, it automic remove unused entry
txnBackendMap.put(label, backendID);
nodeID = getBackendOrComputeId();
// txnNodeMap is LRU cache, it atomic remove unused entry
txnNodeMap.put(label, nodeID);
} else if (channelIdStr == null) {
backendID = txnBackendMap.get(label);
nodeID = txnNodeMap.get(label);
}
}
}
Expand Down Expand Up @@ -268,36 +266,53 @@ public void executeTransaction(BaseRequest request, BaseResponse response) throw
}

if (op.equalsIgnoreCase(TXN_COMMIT) && channelIdStr != null) {
int channelId = Integer.parseInt(channelIdStr);
TransactionResult resp = new TransactionResult();
GlobalStateMgr.getCurrentState().getStreamLoadMgr().commitLoadTask(label, resp);
sendResult(request, response, resp);
return;
}

if (op.equalsIgnoreCase(TXN_ROLLBACK) && channelIdStr != null) {
int channelId = Integer.parseInt(channelIdStr);
TransactionResult resp = new TransactionResult();
GlobalStateMgr.getCurrentState().getStreamLoadMgr().rollbackLoadTask(label, resp);
sendResult(request, response, resp);
return;
}


if (backendID == null) {
throw new UserException("transaction with op " + op + " label " + label + " has no backend");
if (nodeID == null) {
throw new UserException("transaction with op " + op + " label " + label + " has no node");
}

Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackend(backendID);
if (backend == null) {
throw new UserException("Backend " + backendID + " is not alive");
ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackend(nodeID);
if (node == null) {
node = GlobalStateMgr.getCurrentSystemInfo().getComputeNode(nodeID);
if (node == null) {
throw new UserException("Node " + nodeID + " is not alive");
}
}

TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
TNetworkAddress redirectAddr = new TNetworkAddress(node.getHost(), node.getHttpPort());

LOG.info("redirect transaction action to destination={}, db: {}, table: {}, op: {}, label: {}",
redirectAddr, dbName, tableName, op, label);
redirectTo(request, response, redirectAddr);
}

private static Long getBackendOrComputeId() throws UserException {
List<Long> backendIds = GlobalStateMgr.getCurrentSystemInfo().seqChooseBackendIds(1, true, false);
if (CollectionUtils.isNotEmpty(backendIds)) {
return backendIds.get(0);
}
if (RunMode.getCurrentRunMode() == RunMode.SHARED_NOTHING) {
throw new UserException("No backend alive.");
}
List<Long> computeNodes = GlobalStateMgr.getCurrentSystemInfo().seqChooseComputeNodes(1, true,
false);
if (CollectionUtils.isNotEmpty(computeNodes)) {
return computeNodes.get(0);
}
throw new UserException("No backend or compute node alive.");
}
}

112 changes: 67 additions & 45 deletions fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public class SystemInfoService implements GsonPostProcessable {
@SerializedName(value = "ce")
private volatile ConcurrentHashMap<Long, ComputeNode> idToComputeNodeRef;

private long lastBackendIdForCreation = -1;
private long lastBackendIdForOther = -1;
private long lastNodeIdForCreation = -1;
private long lastNodeIdForOther = -1;

private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef;
private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef;
Expand Down Expand Up @@ -271,7 +271,7 @@ public ShowResultSet modifyBackendHost(ModifyBackendAddressClause modifyBackendA
String opMessage;
formatSb.append("%s:%d's host has been modified to %s");
if (candidateBackends.size() >= 2) {
formatSb.append("\nplease exectue %d times, to modify the remaining backends\n");
formatSb.append("\nplease execute %d times, to modify the remaining backends\n");
for (int i = 1; i < candidateBackends.size(); i++) {
Backend be = candidateBackends.get(i);
formatSb.append(be.getHost() + ":" + be.getHeartbeatPort() + "\n");
Expand Down Expand Up @@ -655,6 +655,12 @@ public int getTotalBackendNumber() {
return idToBackendRef.size();
}

public int getTotalComputeNodeNumber() {
return idToComputeNodeRef.size();
}



public int getAliveComputeNodeNumber() {
return getComputeNodeIds(true).size();
}
Expand Down Expand Up @@ -745,14 +751,20 @@ public List<Backend> getBackends() {

public List<Backend> getAvailableBackends() {
return getBackends().stream()
.filter(v -> v.isAvailable())
.filter(ComputeNode::isAvailable)
.collect(Collectors.toList());
}

public List<ComputeNode> getComputeNodes() {
return Lists.newArrayList(idToComputeNodeRef.values());
}

public List<ComputeNode> getAvailableComputeNodes() {
return getComputeNodes().stream()
.filter(ComputeNode::isAvailable)
.collect(Collectors.toList());
}

public Stream<ComputeNode> backendAndComputeNodeStream() {
return Stream.concat(idToBackendRef.values().stream(), idToComputeNodeRef.values().stream());
}
Expand All @@ -769,16 +781,26 @@ public List<Long> seqChooseBackendIds(int backendNum, boolean needAvailable, boo
return seqChooseBackendIds(backendNum, needAvailable, isCreate, v -> !v.diskExceedLimit());
}

public List<Long> seqChooseComputeNodes(int computeNodeNum, boolean needAvailable, boolean isCreate) {

final List<ComputeNode> candidateComputeNodes = needAvailable ? getAvailableComputeNodes() : getComputeNodes();
if (CollectionUtils.isEmpty(candidateComputeNodes)) {
LOG.warn("failed to find any compute nodes, needAvailable={}", needAvailable);
return Collections.emptyList();
}

return seqChooseNodeIds(computeNodeNum, isCreate, candidateComputeNodes);
}

private List<Long> seqChooseBackendIds(int backendNum, boolean needAvailable, boolean isCreate,
Predicate<? super Backend> predicate) {

final List<Backend> candidateBackends = needAvailable ? getAvailableBackends() : getBackends();
if (CollectionUtils.isEmpty(candidateBackends)) {
LOG.warn("failed to find any backend, needAvailable={}", needAvailable);
return Collections.emptyList();
}

final List<Backend> filteredBackends = candidateBackends.stream()
final List<ComputeNode> filteredBackends = candidateBackends.stream()
.filter(predicate)
.collect(Collectors.toList());

Expand All @@ -792,66 +814,66 @@ private List<Long> seqChooseBackendIds(int backendNum, boolean needAvailable, bo
candidateBackends.size(), needAvailable, backendInfo);
return Collections.emptyList();
}
return seqChooseBackendIds(backendNum, isCreate, filteredBackends);
return seqChooseNodeIds(backendNum, isCreate, filteredBackends);
}

/**
* choose backends by round-robin
* choose nodes by round-robin
*
* @param backendNum number of backend wanted
* @param isCreate last backend id for creation
* @param srcBackends list of the candidate backends
* @return empty list if not enough backend, otherwise return a list of backend's id
* @param nodeNum number of node wanted
* @param isCreate last node id for creation
* @param srcNodes list of the candidate nodes
* @return empty list if not enough node, otherwise return a list of node's id
*/
private synchronized List<Long> seqChooseBackendIds(int backendNum, boolean isCreate, final List<Backend> srcBackends) {

long lastBackendId;
long lastNodeId;

if (isCreate) {
lastBackendId = lastBackendIdForCreation;
lastNodeId = lastNodeIdForCreation;
} else {
lastBackendId = lastBackendIdForOther;
lastNodeId = lastNodeIdForOther;
}

// host -> BE list
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
for (Backend backend : srcBackends) {
String host = backend.getHost();
Map<String, List<ComputeNode>> nodeMaps = Maps.newHashMap();
for (ComputeNode node : srcNodes) {
String host = node.getHost();

if (!backendMaps.containsKey(host)) {
backendMaps.put(host, Lists.newArrayList());
if (!nodeMaps.containsKey(host)) {
nodeMaps.put(host, Lists.newArrayList());
}

backendMaps.get(host).add(backend);
nodeMaps.get(host).add(node);
}

// if more than one backend exists in same host, select a backend at random
List<Backend> backends = Lists.newArrayList();
for (List<Backend> list : backendMaps.values()) {
List<ComputeNode> nodes = Lists.newArrayList();
for (List<ComputeNode> list : nodeMaps.values()) {
Collections.shuffle(list);
backends.add(list.get(0));
nodes.add(list.get(0));
}

List<Long> backendIds = Lists.newArrayList();
// get last backend index
int lastBackendIndex = -1;
List<Long> nodeIds = Lists.newArrayList();
// get last node index
int lastNodeIndex = -1;
int index = -1;
for (Backend backend : backends) {
for (ComputeNode node : nodes) {
index++;
if (backend.getId() == lastBackendId) {
lastBackendIndex = index;
if (node.getId() == lastNodeId) {
lastNodeIndex = index;
break;
}
}
Iterator<Backend> iterator = Iterators.cycle(backends);
Iterator<ComputeNode> iterator = Iterators.cycle(nodes);
index = -1;
boolean failed = false;
// 2 cycle at most
int maxIndex = 2 * backends.size();
while (iterator.hasNext() && backendIds.size() < backendNum) {
Backend backend = iterator.next();
int maxIndex = 2 * nodes.size();
while (iterator.hasNext() && nodeIds.size() < nodeNum) {
ComputeNode node = iterator.next();
index++;
if (index <= lastBackendIndex) {
if (index <= lastNodeIndex) {
continue;
}

Expand All @@ -860,34 +882,34 @@ private synchronized List<Long> seqChooseBackendIds(int backendNum, boolean isCr
break;
}

long backendId = backend.getId();
if (!backendIds.contains(backendId)) {
backendIds.add(backendId);
lastBackendId = backendId;
long nodeId = node.getId();
if (!nodeIds.contains(nodeId)) {
nodeIds.add(nodeId);
lastNodeId = nodeId;
} else {
failed = true;
break;
}
}

if (backendIds.size() != backendNum) {
if (nodeIds.size() != nodeNum) {
failed = true;
}

if (failed) {
// debug: print backend info when the selection failed
for (Backend backend : backends) {
LOG.debug("random select: {}", backend);
for (ComputeNode node : nodes) {
LOG.debug("random select: {}", node);
}
return Collections.emptyList();
}

if (isCreate) {
lastBackendIdForCreation = lastBackendId;
lastNodeIdForCreation = lastNodeId;
} else {
lastBackendIdForOther = lastBackendId;
lastNodeIdForOther = lastNodeId;
}
return backendIds;
return nodeIds;
}

public ImmutableMap<Long, Backend> getIdToBackend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.starrocks.sql.ast.AlterSystemStmt;
import com.starrocks.sql.ast.DropBackendClause;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import mockit.Expectations;
import mockit.Mock;
Expand All @@ -65,6 +66,7 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;

public class SystemInfoServiceTest {

Expand Down Expand Up @@ -350,4 +352,30 @@ public void testSaveLoadBackend() throws Exception {
deleteDir(dir);
}

@Test
public void testSeqChooseComputeNodes() {
clearAllBackend();
AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234"));
com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null));

try {
GlobalStateMgr.getCurrentSystemInfo().addComputeNodes(stmt.getHostPortPairs());
} catch (DdlException e) {
Assert.fail();
}

Assert.assertNotNull(GlobalStateMgr.getCurrentSystemInfo().
getComputeNodeWithHeartbeatPort("192.168.0.1", 1234));

List<Long> longList = GlobalStateMgr.getCurrentSystemInfo().seqChooseComputeNodes(1, false, false);
Assert.assertEquals(1, longList.size());
ComputeNode computeNode = new ComputeNode();
computeNode.setHost("192.168.0.1");
computeNode.setHttpPort(9030);
computeNode.setAlive(true);
GlobalStateMgr.getCurrentSystemInfo().addComputeNode(computeNode);
List<Long> computeNods = GlobalStateMgr.getCurrentSystemInfo().seqChooseComputeNodes(1, true, false);
Assert.assertEquals(1, computeNods.size());
}

}
Loading

0 comments on commit 1cec56d

Please sign in to comment.