Skip to content

Commit

Permalink
[Feature] register service to starMgr when FE start (StarRocks#6169)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc982627271 authored May 20, 2022
1 parent bea2b3c commit 69780d5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
50 changes: 49 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.common.Config;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand All @@ -20,11 +23,22 @@
* 2. Maintenance of StarOS worker to StarRocks backend map.
*/
public class StarOSAgent {
private static final Logger LOG = LogManager.getLogger(StarOSAgent.class);

private StarClient client;
// private Map<Long, Long> workerIdToBeId;
private long serviceId;

public StarOSAgent() {
client = new StarClient();
serviceId = -1;
// check if Config.starmanager_address == FE address
if (Config.integrate_staros) {
String[] starMgrAddr = Config.starmgr_address.split(":");
if (!starMgrAddr[0].equals("127.0.0.1")) {
LOG.warn("Config.starmgr_address not equal 127.0.0.1, it is {}", starMgrAddr[0]);
System.exit(-1);
}
}
}

public List<Long> createShards(int numShards) {
Expand All @@ -48,6 +62,22 @@ public Set<Long> getBackendIdsByShard(long shardId) {
return backendIds;
}

public void registerAndBootstrapService(String serviceName) {
if (serviceId == -1) {
client.registerService("starrocks");
serviceId = client.bootstrapService("starrocks", serviceName);
}
LOG.info("serviceId from starClient is {} ", serviceId);
}

public void getServiceId(String serviceName) {
if (serviceId != -1) {
return;
}
serviceId = client.getServiceInfo(serviceName);
}


// Mock StarClient
private class StarClient {
// private Map<Long, List<Replica>> shardIdToWorkerIds;
Expand Down Expand Up @@ -87,6 +117,24 @@ public synchronized List<Long> getWorkerIdsByShard(long shardId) {
public synchronized Worker getWorker(long id) {
return idToWorker.get(id);
}

// register service
public synchronized void registerService(String serviceTemplateName) {
LOG.info("service {} registered.", serviceTemplateName);
}

// bootstrap service
public synchronized long bootstrapService(String serviceTemplateName, String serviceName) {
long serviceId = 1;
LOG.info("service {} bootstrapped.", serviceName);
return serviceId;
}

public long getServiceInfo(String serviceName) {
long serviceId = 1;
// get serviceId by client
return serviceId;
}
}

// Mock StarOS Worker
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,11 @@ public class Config extends ConfigBase {
*/
@ConfField
public static boolean use_staros = false;
@ConfField
public static String starmgr_address = "127.0.0.1:6090";
@ConfField
public static boolean integrate_staros = false;

/**
* default bucket number when create OLAP table without buckets info
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ private TOlapTableLocationParam createLocation(OlapTable table) throws UserExcep
private TNodesInfo createStarrocksNodesInfo() {
TNodesInfo nodesInfo = new TNodesInfo();
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getOrCreateSystemInfo(clusterId);
;
for (Long id : systemInfoService.getBackendIds(false)) {
Backend backend = systemInfoService.getBackend(id);
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,12 @@ private void startMasterOnlyDaemonThreads() {
statisticsMetaManager.start();
statisticAutoCollector.start();
taskManager.start();

// register service to starMgr
if (Config.integrate_staros) {
int clusterId = getCurrentState().getClusterId();
getStarOSAgent().registerAndBootstrapService(Integer.toString(clusterId));
}
}

// start threads that should running on all FE
Expand Down Expand Up @@ -1043,6 +1049,11 @@ private void transferToNonMaster(FrontendNodeType newType) {
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
// get serviceId from starMgr
if (Config.integrate_staros) {
int clusterId = getCurrentState().getClusterId();
getStarOSAgent().getServiceId(Integer.toString(clusterId));
}
return;
}

Expand Down

0 comments on commit 69780d5

Please sign in to comment.