Skip to content

Commit

Permalink
Revert "Rename BackendCoreStat Class (StarRocks#20449)" (StarRocks#20821
Browse files Browse the repository at this point in the history
)

This reverts commit 15cc7fa.
  • Loading branch information
imay authored Mar 31, 2023
1 parent 29171b8 commit 5f2d7d4
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import com.starrocks.common.util.TimeUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.system.DataNode;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -187,7 +187,7 @@ public static List<List<String>> getClusterBackendInfos() {
backendInfo.add(String.format("%.2f", dataUsed) + " %");

// Num CPU cores
backendInfo.add(DataNodeCoreStat.getCoresOfBe(backendId));
backendInfo.add(BackendCoreStat.getCoresOfBe(backendId));

backendInfo.add(backend.getNumRunningQueries());
double memUsedPct = backend.getMemUsedPct();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.starrocks.common.util.ListComparator;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -118,7 +118,7 @@ public static List<List<String>> getClusterComputeNodesInfos() {
computeNodeInfo.add(computeNode.getHeartbeatErrMsg());
computeNodeInfo.add(computeNode.getVersion());

computeNodeInfo.add(DataNodeCoreStat.getCoresOfBe(computeNodeId));
computeNodeInfo.add(BackendCoreStat.getCoresOfBe(computeNodeId));

computeNodeInfo.add(computeNode.getNumRunningQueries());
double memUsedPct = computeNode.getMemUsedPct();
Expand Down
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.starrocks.common.util.CompressionUtils;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.qe.VariableMgr.VarAttr;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.thrift.TCompressionType;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TQueryOptions;
Expand Down Expand Up @@ -1219,9 +1219,9 @@ public int getDegreeOfParallelism() {
return pipelineDop;
}
if (maxPipelineDop <= 0) {
return DataNodeCoreStat.getDefaultDOP();
return BackendCoreStat.getDefaultDOP();
}
return Math.min(maxPipelineDop, DataNodeCoreStat.getDefaultDOP());
return Math.min(maxPipelineDop, BackendCoreStat.getDefaultDOP());
} else {
return parallelExecInstanceNum;
}
Expand All @@ -1233,9 +1233,9 @@ public int getSinkDegreeOfParallelism() {
return pipelineDop;
}
if (maxPipelineDop <= 0) {
return DataNodeCoreStat.getSinkDefaultDOP();
return BackendCoreStat.getSinkDefaultDOP();
}
return Math.min(maxPipelineDop, DataNodeCoreStat.getSinkDefaultDOP());
return Math.min(maxPipelineDop, BackendCoreStat.getSinkDefaultDOP());
} else {
return parallelExecInstanceNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.starrocks.catalog.ResourceGroup;
import com.starrocks.catalog.ResourceGroupClassifier;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.thrift.TWorkGroupType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.net.util.SubnetUtils;
Expand Down Expand Up @@ -140,7 +140,7 @@ public static void analyzeProperties(ResourceGroup resourceGroup, Map<String, St
String value = e.getValue();
if (key.equalsIgnoreCase(ResourceGroup.CPU_CORE_LIMIT)) {
int cpuCoreLimit = Integer.parseInt(value);
int avgCoreNum = DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe();
int avgCoreNum = BackendCoreStat.getAvgNumOfHardwareCoresOfBe();
if (cpuCoreLimit <= 0 || cpuCoreLimit > avgCoreNum) {
throw new SemanticException(String.format("cpu_core_limit should range from 1 to %d", avgCoreNum));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.optimizer.statistics.ColumnStatistic;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -134,9 +134,9 @@ public JsonElement serialize(QueryDumpInfo dumpInfo, Type type, JsonSerializatio
// backend core stat
JsonObject backendCoreStat = new JsonObject();
backendCoreStat.addProperty("numOfHardwareCoresPerBe",
GsonUtils.GSON.toJson(DataNodeCoreStat.getNumOfHardwareCoresPerBe()));
GsonUtils.GSON.toJson(BackendCoreStat.getNumOfHardwareCoresPerBe()));
backendCoreStat.addProperty("cachedAvgNumOfHardwareCores",
DataNodeCoreStat.getCachedAvgNumOfHardwareCores());
BackendCoreStat.getCachedAvgNumOfHardwareCores());
dumpJson.add("be_core_stat", backendCoreStat);
// exception
JsonArray exceptions = new JsonArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class DataNodeCoreStat {
public class BackendCoreStat {
private static int DEFAULT_CORES_OF_BE = 1;

private static ConcurrentHashMap<Long, Integer> numOfHardwareCoresPerBe = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -83,7 +83,7 @@ public static int getAvgNumOfHardwareCoresOfBe() {
}

public static int getDefaultDOP() {
int avgNumOfCores = DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe();
int avgNumOfCores = BackendCoreStat.getAvgNumOfHardwareCoresOfBe();
return Math.max(1, avgNumOfCores / 2);
}

Expand All @@ -92,7 +92,7 @@ public static int getSinkDefaultDOP() {
// so we can't let query engine use up resources
// At the same time, the improvement of performance and concurrency is not linear
// but the memory usage increases linearly, so we control the slope of concurrent growth
int avgCoreNum = DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe();
int avgCoreNum = BackendCoreStat.getAvgNumOfHardwareCoresOfBe();
if (avgCoreNum <= 24) {
return Math.max(1, avgCoreNum / 3);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay)
if (this.cpuCores != hbResponse.getCpuCores()) {
isChanged = true;
this.cpuCores = hbResponse.getCpuCores();
DataNodeCoreStat.setNumOfHardwareCoresOfBe(hbResponse.getBeId(), hbResponse.getCpuCores());
BackendCoreStat.setNumOfHardwareCoresOfBe(hbResponse.getBeId(), hbResponse.getCpuCores());
}

heartbeatErrMsg = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected void runAfterCatalogReady() {
// we also add a 'mocked' master Frontend heartbeat response to synchronize master info to other Frontends.
Map<Long, Integer> backendId2cpuCores = Maps.newHashMap();
idToBackendRef.values().forEach(
backend -> backendId2cpuCores.put(backend.getId(), DataNodeCoreStat.getCoresOfBe(backend.getId())));
backend -> backendId2cpuCores.put(backend.getId(), BackendCoreStat.getCoresOfBe(backend.getId())));
hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName, Config.query_port, Config.rpc_port,
GlobalStateMgr.getCurrentState().getMaxJournalId(),
System.currentTimeMillis(), GlobalStateMgr.getCurrentState().getFeStartTime(),
Expand All @@ -210,7 +210,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
DataNode be = nodeMgr.getBackend(backendId);
if (be != null && be.getCpuCores() != cpuCores) {
be.setCpuCores(cpuCores);
DataNodeCoreStat.setNumOfHardwareCoresOfBe(backendId, cpuCores);
BackendCoreStat.setNumOfHardwareCoresOfBe(backendId, cpuCores);
}
});

Expand Down Expand Up @@ -315,7 +315,7 @@ public HeartbeatResponse call() {
// Update number of hardare of cores of corresponding backend.
int cpuCores = tBackendInfo.isSetNum_hardware_cores() ? tBackendInfo.getNum_hardware_cores() : 0;
if (tBackendInfo.isSetNum_hardware_cores()) {
DataNodeCoreStat.setNumOfHardwareCoresOfBe(computeNodeId, cpuCores);
BackendCoreStat.setNumOfHardwareCoresOfBe(computeNodeId, cpuCores);
}

// backend.updateOnce(bePort, httpPort, beRpcPort, brpcPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.starrocks.sql.ast.CreateViewStmt;
import com.starrocks.sql.ast.DescribeStmt;
import com.starrocks.sql.ast.DropTableStmt;
import com.starrocks.sql.ast.ShowResourceGroupStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@
import com.starrocks.sql.ast.ShowUserStmt;
import com.starrocks.sql.ast.ShowVariablesStmt;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.DataNode;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TStorageType;
import mockit.Expectations;
Expand Down Expand Up @@ -828,7 +828,7 @@ ComputeNode getComputeNode(long computeNodeId) {
}
};

new MockUp<DataNodeCoreStat>() {
new MockUp<BackendCoreStat>() {
@Mock
int getCoresOfBe(long beId) {
return 16;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.starrocks.common.Pair;
import com.starrocks.planner.PlanFragment;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.utframe.UtFrameUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testGroupBy2() throws Exception {
int cpuCores = 8;
int expectedTotalDop = cpuCores / 2;
{
DataNodeCoreStat.setDefaultCoresOfBe(cpuCores);
BackendCoreStat.setDefaultCoresOfBe(cpuCores);
Pair<String, ExecPlan> plan = UtFrameUtils.getPlanAndFragment(connectContext, queryStr);
String explainString = plan.second.getExplainString(TExplainLevel.NORMAL);
assertContains(explainString, "2:Project\n" +
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testGroupBy2() throws Exception {
} finally {
connectContext.getSessionVariable().setPipelineDop(originPipelineDop);
connectContext.getSessionVariable().setPipelineDop(originInstanceNum);
DataNodeCoreStat.setDefaultCoresOfBe(1);
BackendCoreStat.setDefaultCoresOfBe(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.starrocks.common.FeConstants;
import com.starrocks.planner.PlanFragment;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.thrift.TExplainLevel;
import mockit.Mock;
import mockit.MockUp;
Expand All @@ -29,7 +29,7 @@
import org.junit.Test;

public class PipelineParallelismTest extends PlanTestBase {
private MockUp<DataNodeCoreStat> mockedBackendCoreStat = null;
private MockUp<BackendCoreStat> mockedBackendCoreStat = null;
private final int parallelExecInstanceNum = 16;
private final int numHardwareCores = 8;
private int prevParallelExecInstanceNum = 0;
Expand All @@ -38,7 +38,7 @@ public class PipelineParallelismTest extends PlanTestBase {

@Before
public void setUp() {
mockedBackendCoreStat = new MockUp<DataNodeCoreStat>() {
mockedBackendCoreStat = new MockUp<BackendCoreStat>() {
@Mock
public int getAvgNumOfHardwareCoresOfBe() {
return numHardwareCores;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.optimizer.rule.RuleSet;
import com.starrocks.sql.optimizer.rule.transformation.JoinAssociativityRule;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
Expand Down Expand Up @@ -71,7 +71,7 @@ public static void beforeClass() throws Exception {

@Before
public void before() {
DataNodeCoreStat.reset();
BackendCoreStat.reset();
connectContext.getSessionVariable().setCboPushDownAggregateMode(-1);
}

Expand Down
48 changes: 24 additions & 24 deletions fe/fe-core/src/test/java/com/starrocks/system/DataNodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,39 @@ public void testSetHeartbeatPort() {

@Test
public void cpuCoreUpdate() {
DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 8);
Assert.assertEquals(8, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(4, DataNodeCoreStat.getDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 8);
Assert.assertEquals(8, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(4, BackendCoreStat.getDefaultDOP());

DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 16);
Assert.assertEquals(16, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(8, DataNodeCoreStat.getDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 16);
Assert.assertEquals(16, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(8, BackendCoreStat.getDefaultDOP());
}

@Test
public void defaultSinkDopTest() {
DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 8);
Assert.assertEquals(8, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(2, DataNodeCoreStat.getSinkDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 8);
Assert.assertEquals(8, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(2, BackendCoreStat.getSinkDefaultDOP());

DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 16);
Assert.assertEquals(16, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(5, DataNodeCoreStat.getSinkDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 16);
Assert.assertEquals(16, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(5, BackendCoreStat.getSinkDefaultDOP());

DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 24);
Assert.assertEquals(24, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(8, DataNodeCoreStat.getSinkDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 24);
Assert.assertEquals(24, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(8, BackendCoreStat.getSinkDefaultDOP());

DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 32);
Assert.assertEquals(32, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(8, DataNodeCoreStat.getSinkDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 32);
Assert.assertEquals(32, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(8, BackendCoreStat.getSinkDefaultDOP());

DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 48);
Assert.assertEquals(48, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(12, DataNodeCoreStat.getSinkDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 48);
Assert.assertEquals(48, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(12, BackendCoreStat.getSinkDefaultDOP());

DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 64);
Assert.assertEquals(64, DataNodeCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(16, DataNodeCoreStat.getSinkDefaultDOP());
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 64);
Assert.assertEquals(64, BackendCoreStat.getAvgNumOfHardwareCoresOfBe());
Assert.assertEquals(16, BackendCoreStat.getSinkDefaultDOP());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.parser.NodePosition;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.system.BackendCoreStat;
import org.apache.commons.lang.StringUtils;
import org.junit.Assert;

Expand Down Expand Up @@ -302,7 +302,7 @@ public StarRocksAssert withWarehouse(String sql) throws Exception {

public void executeResourceGroupDdlSql(String sql) throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 32);
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 32);
StatementBase statement = com.starrocks.sql.parser.SqlParser.parse(sql, ctx.getSessionVariable()).get(0);
Analyzer.analyze(statement, ctx);

Expand All @@ -314,7 +314,7 @@ public void executeResourceGroupDdlSql(String sql) throws Exception {

public List<List<String>> executeResourceGroupShowSql(String sql) throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
DataNodeCoreStat.setNumOfHardwareCoresOfBe(1, 32);
BackendCoreStat.setNumOfHardwareCoresOfBe(1, 32);

StatementBase statement = com.starrocks.sql.parser.SqlParser.parse(sql, ctx.getSessionVariable().getSqlMode()).get(0);
Analyzer.analyze(statement, ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanFragmentBuilder;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.system.BackendCoreStat;
import com.starrocks.system.DataNode;
import com.starrocks.system.DataNodeCoreStat;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TResultSinkType;
import org.apache.commons.codec.binary.Hex;
Expand Down Expand Up @@ -558,9 +558,9 @@ private static String initMockEnv(ConnectContext connectContext, QueryDumpInfo r
}
// mock be core stat
for (Map.Entry<Long, Integer> entry : replayDumpInfo.getNumOfHardwareCoresPerBe().entrySet()) {
DataNodeCoreStat.setNumOfHardwareCoresOfBe(entry.getKey(), entry.getValue());
BackendCoreStat.setNumOfHardwareCoresOfBe(entry.getKey(), entry.getValue());
}
DataNodeCoreStat.setCachedAvgNumOfHardwareCores(replayDumpInfo.getCachedAvgNumOfHardwareCores());
BackendCoreStat.setCachedAvgNumOfHardwareCores(replayDumpInfo.getCachedAvgNumOfHardwareCores());

// mock table row count
for (Map.Entry<String, Map<String, Long>> entry : replayDumpInfo.getPartitionRowCountMap().entrySet()) {
Expand Down

0 comments on commit 5f2d7d4

Please sign in to comment.