From 0c0ea45ed1a8f43abe7bdbeececeb57f9cb77273 Mon Sep 17 00:00:00 2001 From: Murphy <96611012+murphyatwork@users.noreply.github.com> Date: Mon, 14 Oct 2024 19:17:32 +0800 Subject: [PATCH] [Enhancement] auto change replication_num of system tables (#51799) Signed-off-by: Murphy --- .../load/loadv2/LoadsHistorySyncer.java | 35 ++----- .../load/pipe/filelist/FileListTableRepo.java | 12 +-- .../load/pipe/filelist/RepoCreator.java | 34 ++----- .../scheduler/history/TableKeeper.java | 39 +++----- .../history/TaskRunHistoryTable.java | 3 +- .../starrocks/statistic/StatisticUtils.java | 28 ++++++ .../statistic/StatisticsMetaManager.java | 99 ++++--------------- .../starrocks/system/SystemInfoService.java | 22 +++++ .../load/pipe/filelist/FileListRepoTest.java | 12 ++- .../scheduler/history/TaskRunHistoryTest.java | 1 - .../statistic/StatisticUtilsTest.java | 80 +++++++++++++++ 11 files changed, 185 insertions(+), 180 deletions(-) create mode 100644 fe/fe-core/src/test/java/com/starrocks/statistic/StatisticUtilsTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java index 3f7cbad5634911..b91325a37a5195 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java @@ -15,7 +15,6 @@ package com.starrocks.load.loadv2; import com.starrocks.catalog.CatalogUtils; -import com.starrocks.catalog.OlapTable; import com.starrocks.common.Config; import com.starrocks.common.FeConstants; import com.starrocks.common.UserException; @@ -23,6 +22,7 @@ import com.starrocks.common.util.FrontendDaemon; import com.starrocks.load.pipe.filelist.RepoExecutor; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.statistic.StatisticUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,7 +68,7 @@ public class LoadsHistorySyncer extends FrontendDaemon { "properties('replication_num' = '%d') "; private static final String CORRECT_LOADS_HISTORY_REPLICATION_NUM = - "ALTER TABLE %s SET ('default.replication_num'='3')"; + "ALTER TABLE %s SET ('default.replication_num'='%d')"; private static final String LOADS_HISTORY_SYNC = "INSERT INTO %s " + @@ -78,10 +78,9 @@ public class LoadsHistorySyncer extends FrontendDaemon { "AND load_finish_time > ( " + "SELECT COALESCE(MAX(load_finish_time), '0001-01-01 00:00:00') " + "FROM %s);"; - + private boolean databaseExists = false; private boolean tableExists = false; - private boolean tableCorrected = false; public LoadsHistorySyncer() { super("Load history syncer", Config.loads_history_sync_interval_second * 1000L); @@ -97,23 +96,7 @@ public static void createTable() throws UserException { } public static boolean correctTable() { - int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber(); - int replica = GlobalStateMgr.getCurrentState().getLocalMetastore() - .mayGetTable(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME) - .map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum()) - .orElse((short) 1); - if (numBackends < 3) { - LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends); - return false; - } - if (replica < 3) { - String sql = SQLBuilder.buildAlterTableSql(); - RepoExecutor.getInstance().executeDDL(sql); - } else { - LOG.info("table {} already has {} replicas, no need to alter replication_num", - LOADS_HISTORY_TABLE_NAME, replica); - } - return true; + return StatisticUtils.alterSystemTableReplicationNumIfNecessary(LOADS_HISTORY_TABLE_NAME); } public void checkMeta() throws UserException { @@ -129,10 +112,8 @@ public void checkMeta() throws UserException { LOG.info("table created: " + LOADS_HISTORY_TABLE_NAME); tableExists = true; } - if (!tableCorrected && correctTable()) { - LOG.info("table corrected: " + LOADS_HISTORY_TABLE_NAME); - tableCorrected = true; - } + + correctTable(); if (getInterval() != Config.loads_history_sync_interval_second * 1000L) { setInterval(Config.loads_history_sync_interval_second * 1000L); @@ -171,9 +152,9 @@ public static String buildCreateTableSql() throws UserException { CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica); } - public static String buildAlterTableSql() { + public static String buildAlterTableSql(int replica) { return String.format(CORRECT_LOADS_HISTORY_REPLICATION_NUM, - CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME)); + CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica); } public static String buildSyncSql() { diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/FileListTableRepo.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/FileListTableRepo.java index 04c7408d5f47ca..8a12ebb55fe2ff 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/FileListTableRepo.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/FileListTableRepo.java @@ -16,7 +16,6 @@ import com.starrocks.catalog.CatalogUtils; import com.starrocks.common.UserException; -import com.starrocks.common.util.AutoInferUtil; import com.starrocks.load.pipe.PipeFileRecord; import com.starrocks.statistic.StatsConstants; import org.apache.commons.collections.CollectionUtils; @@ -64,7 +63,7 @@ public class FileListTableRepo extends FileListRepo { "properties('replication_num' = '%d') "; protected static final String CORRECT_FILE_LIST_REPLICATION_NUM = - "ALTER TABLE %s SET ('replication_num'='3')"; + "ALTER TABLE %s SET ('replication_num'='%d')"; protected static final String ALL_COLUMNS = "`pipe_id`, `file_name`, `file_version`, `file_size`, `state`, `last_modified`, `staged_time`," + @@ -155,15 +154,14 @@ public void destroy() { */ static class SQLBuilder { - public static String buildCreateTableSql() throws UserException { - int replica = AutoInferUtil.calDefaultReplicationNum(); + public static String buildCreateTableSql(int replicationNum) throws UserException { return String.format(FILE_LIST_TABLE_CREATE, - CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replica); + CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum); } - public static String buildAlterTableSql() { + public static String buildAlterTableSql(int replicationNum) { return String.format(CORRECT_FILE_LIST_REPLICATION_NUM, - CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME)); + CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoCreator.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoCreator.java index aea6240a646a2d..578d0169d915f3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoCreator.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoCreator.java @@ -14,9 +14,9 @@ package com.starrocks.load.pipe.filelist; -import com.starrocks.catalog.OlapTable; import com.starrocks.common.UserException; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.statistic.StatisticUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,7 +30,6 @@ public class RepoCreator { private static boolean databaseExists = false; private static boolean tableExists = false; - private static boolean tableCorrected = false; public static RepoCreator getInstance() { return INSTANCE; @@ -50,10 +49,7 @@ public void run() { LOG.info("table created: " + FileListTableRepo.FILE_LIST_TABLE_NAME); tableExists = true; } - if (!tableCorrected && correctTable()) { - LOG.info("table corrected: " + FileListTableRepo.FILE_LIST_TABLE_NAME); - tableCorrected = true; - } + correctTable(); } catch (Exception e) { LOG.error("error happens in RepoCreator: ", e); } @@ -64,28 +60,14 @@ public boolean checkDatabaseExists() { } public static void createTable() throws UserException { - String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql(); + int expectedReplicationNum = + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum(); + String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql(expectedReplicationNum); RepoExecutor.getInstance().executeDDL(sql); } public static boolean correctTable() { - int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber(); - int replica = GlobalStateMgr.getCurrentState() - .getLocalMetastore().mayGetTable(FileListTableRepo.FILE_LIST_DB_NAME, FileListTableRepo.FILE_LIST_TABLE_NAME) - .map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum()) - .orElse((short) 1); - if (numBackends < 3) { - LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends); - return false; - } - if (replica < 3) { - String sql = FileListTableRepo.SQLBuilder.buildAlterTableSql(); - RepoExecutor.getInstance().executeDDL(sql); - } else { - LOG.info("table {} already has {} replicas, no need to alter replication_num", - FileListTableRepo.FILE_LIST_FULL_NAME, replica); - } - return true; + return StatisticUtils.alterSystemTableReplicationNumIfNecessary(FileListTableRepo.FILE_LIST_TABLE_NAME); } public boolean isDatabaseExists() { @@ -95,8 +77,4 @@ public boolean isDatabaseExists() { public boolean isTableExists() { return tableExists; } - - public boolean isTableCorrected() { - return tableCorrected; - } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java index 9bd41bf2f0ef6d..57b5e71c425443 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java @@ -42,7 +42,6 @@ public class TableKeeper { private final String databaseName; private final String tableName; private final String createTableSql; - private final int tableReplicas; private boolean databaseExisted = false; private boolean tableExisted = false; @@ -52,12 +51,10 @@ public class TableKeeper { public TableKeeper(String database, String table, String createTable, - int expectedReplicas, Supplier ttlSupplier) { this.databaseName = database; this.tableName = table; this.createTableSql = createTable; - this.tableReplicas = expectedReplicas; this.ttlSupplier = ttlSupplier; } @@ -100,22 +97,20 @@ public void createTable() throws UserException { } public void correctTable() { - int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber(); + int expectedReplicationNum = + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum(); int replica = GlobalStateMgr.getCurrentState() .getLocalMetastore().mayGetTable(databaseName, tableName) .map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum()) .orElse((short) 1); - if (numBackends < tableReplicas) { - LOG.info("not enough backends in the cluster, expected {} but got {}", - tableReplicas, numBackends); - return; - } - if (replica < tableReplicas) { - String sql = alterTableReplicas(); + + if (replica != expectedReplicationNum) { + String sql = alterTableReplicas(expectedReplicationNum); if (StringUtils.isNotEmpty(sql)) { RepoExecutor.getInstance().executeDDL(sql); } - LOG.info("changed replication_number of table {} to {}", tableName, replica); + LOG.info("changed replication_number of table {} from {} to {}", + tableName, replica, expectedReplicationNum); } } @@ -146,7 +141,7 @@ private Optional mayGetTable() { .flatMap(x -> Optional.of((OlapTable) x)); } - private String alterTableReplicas() { + private String alterTableReplicas(int replicationNum) { Optional table = mayGetTable(); if (table.isEmpty()) { return ""; @@ -154,13 +149,13 @@ private String alterTableReplicas() { PartitionInfo partitionInfo = table.get().getPartitionInfo(); if (partitionInfo.isRangePartition()) { String sql1 = String.format("ALTER TABLE %s.%s MODIFY PARTITION(*) SET ('replication_num'='%d');", - databaseName, tableName, tableReplicas); + databaseName, tableName, replicationNum); String sql2 = String.format("ALTER TABLE %s.%s SET ('default.replication_num'='%d');", - databaseName, tableName, tableReplicas); + databaseName, tableName, replicationNum); return sql1 + sql2; } else { return String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')", - databaseName, tableName, tableReplicas); + databaseName, tableName, replicationNum); } } @@ -180,10 +175,6 @@ public String getCreateTableSql() { return createTableSql; } - public int getTableReplicas() { - return tableReplicas; - } - public boolean isDatabaseExisted() { return databaseExisted; } @@ -200,14 +191,6 @@ public void setDatabaseExisted(boolean databaseExisted) { this.databaseExisted = databaseExisted; } - public void setTableExisted(boolean tableExisted) { - this.tableExisted = tableExisted; - } - - public void setTableCorrected(boolean tableCorrected) { - this.tableCorrected = tableCorrected; - } - public static TableKeeperDaemon startDaemon() { TableKeeperDaemon daemon = TableKeeperDaemon.getInstance(); daemon.start(); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java index e79c68406e358b..11f96e1a886bc3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java @@ -52,7 +52,6 @@ public class TaskRunHistoryTable { public static final String DATABASE_NAME = StatsConstants.STATISTICS_DB_NAME; public static final String TABLE_NAME = "task_run_history"; public static final String TABLE_FULL_NAME = DATABASE_NAME + "." + TABLE_NAME; - public static final int TABLE_REPLICAS = 3; public static final String CREATE_TABLE = String.format("CREATE TABLE IF NOT EXISTS %s (" + // identifiers @@ -93,7 +92,7 @@ public class TaskRunHistoryTable { "SELECT history_content_json " + "FROM " + TABLE_FULL_NAME + " WHERE "; private static final TableKeeper KEEPER = - new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE, TABLE_REPLICAS, + new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE, () -> Math.max(1, Config.task_runs_ttl_second / 3600 / 24)); public static TableKeeper createKeeper() { diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index 412397f6feff42..3bc4f13c62b756 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -46,6 +46,7 @@ import com.starrocks.connector.ConnectorPartitionTraits; import com.starrocks.load.EtlStatus; import com.starrocks.load.loadv2.LoadJobFinalOperation; +import com.starrocks.load.pipe.filelist.RepoExecutor; import com.starrocks.load.streamload.StreamLoadTxnCommitAttachment; import com.starrocks.privilege.PrivilegeBuiltinConstants; import com.starrocks.qe.ConnectContext; @@ -61,6 +62,7 @@ import com.starrocks.transaction.TransactionState; import com.starrocks.transaction.TxnCommitAttachment; import com.starrocks.warehouse.Warehouse; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -475,6 +477,32 @@ public static void dropStatisticsAfterDropTable(Table table) { GlobalStateMgr.getCurrentState().getStatisticStorage().expireConnectorTableColumnStatistics(table, columns); } + /** + * Change the replication_num of system table according to cluster status + * 1. When scale-out to greater than 3 nodes, change the replication_num to 3 + * 3. When scale-in to less than 3 node, change it to retainedBackendNum + */ + public static boolean alterSystemTableReplicationNumIfNecessary(String tableName) { + int expectedReplicationNum = + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum(); + int replica = GlobalStateMgr.getCurrentState() + .getLocalMetastore().mayGetTable(StatsConstants.STATISTICS_DB_NAME, tableName) + .map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum()) + .orElse((short) 1); + + if (replica != expectedReplicationNum) { + String sql = String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')", + StatsConstants.STATISTICS_DB_NAME, tableName, expectedReplicationNum); + if (StringUtils.isNotEmpty(sql)) { + RepoExecutor.getInstance().executeDDL(sql); + } + LOG.info("changed replication_number of table {} from {} to {}", + tableName, replica, expectedReplicationNum); + return true; + } + return false; + } + // only support collect statistics for slotRef and subfield expr public static String getColumnName(Table table, Expr column) { String colName; diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsMetaManager.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsMetaManager.java index a2e57330aaf907..6b92da611705a6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsMetaManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsMetaManager.java @@ -20,11 +20,7 @@ import com.starrocks.analysis.TableName; import com.starrocks.catalog.Database; import com.starrocks.catalog.KeysType; -import com.starrocks.catalog.LocalTablet; -import com.starrocks.catalog.OlapTable; -import com.starrocks.catalog.Partition; import com.starrocks.common.Config; -import com.starrocks.common.DdlException; import com.starrocks.common.Pair; import com.starrocks.common.UserException; import com.starrocks.common.util.AutoInferUtil; @@ -37,7 +33,6 @@ import com.starrocks.sql.analyzer.Analyzer; import com.starrocks.sql.ast.CreateDbStmt; import com.starrocks.sql.ast.CreateTableStmt; -import com.starrocks.sql.ast.DropTableStmt; import com.starrocks.sql.ast.HashDistributionDesc; import com.starrocks.sql.ast.KeysDesc; import com.starrocks.sql.common.EngineType; @@ -53,9 +48,6 @@ public class StatisticsMetaManager extends FrontendDaemon { private static final Logger LOG = LogManager.getLogger(StatisticsMetaManager.class); - // If all replicas are lost more than 3 times in a row, rebuild the statistics table - private int lossTableCount = 0; - public StatisticsMetaManager() { super("statistics meta manager", 60L * 1000L); } @@ -83,42 +75,6 @@ private boolean checkTableExist(String tableName) { return GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName) != null; } - private boolean checkReplicateNormal(String tableName) { - int aliveSize = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getAliveBackendNumber(); - int total = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber(); - // maybe cluster just shutdown, ignore - if (aliveSize <= total / 2) { - lossTableCount = 0; - return true; - } - - Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(StatsConstants.STATISTICS_DB_NAME); - Preconditions.checkState(db != null); - OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName); - Preconditions.checkState(table != null); - if (table.isCloudNativeTableOrMaterializedView()) { - return true; - } - - boolean check = true; - for (Partition partition : table.getPartitions()) { - // check replicate miss - if (partition.getBaseIndex().getTablets().stream() - .anyMatch(t -> ((LocalTablet) t).getNormalReplicaBackendIds().isEmpty())) { - check = false; - break; - } - } - - if (!check) { - lossTableCount++; - } else { - lossTableCount = 0; - } - - return lossTableCount < 3; - } - private static final List KEY_COLUMN_NAMES = ImmutableList.of( "table_id", "column_name", "db_id" ); @@ -322,21 +278,6 @@ private void refreshAnalyzeJob() { } } - private boolean dropTable(String tableName) { - LOG.info("drop statistics table start"); - DropTableStmt stmt = new DropTableStmt(true, - new TableName(StatsConstants.STATISTICS_DB_NAME, tableName), true); - - try { - GlobalStateMgr.getCurrentState().getLocalMetastore().dropTable(stmt); - } catch (DdlException e) { - LOG.warn("Failed to drop table", e); - return false; - } - LOG.info("drop statistics table done"); - return !checkTableExist(tableName); - } - private void trySleep(long millis) { try { Thread.sleep(millis); @@ -347,33 +288,24 @@ private void trySleep(long millis) { private boolean createTable(String tableName) { ConnectContext context = StatisticUtils.buildConnectContext(); - context.setThreadLocalInfo(); - - if (tableName.equals(StatsConstants.SAMPLE_STATISTICS_TABLE_NAME)) { - return createSampleStatisticsTable(context); - } else if (tableName.equals(StatsConstants.FULL_STATISTICS_TABLE_NAME)) { - return createFullStatisticsTable(context); - } else if (tableName.equals(StatsConstants.HISTOGRAM_STATISTICS_TABLE_NAME)) { - return createHistogramStatisticsTable(context); - } else if (tableName.equals(StatsConstants.EXTERNAL_FULL_STATISTICS_TABLE_NAME)) { - return createExternalFullStatisticsTable(context); - } else if (tableName.equals(StatsConstants.EXTERNAL_HISTOGRAM_STATISTICS_TABLE_NAME)) { - return createExternalHistogramStatisticsTable(context); - } else { - throw new StarRocksPlannerException("Error table name " + tableName, ErrorType.INTERNAL_ERROR); + try (ConnectContext.ScopeGuard guard = context.bindScope()) { + if (tableName.equals(StatsConstants.SAMPLE_STATISTICS_TABLE_NAME)) { + return createSampleStatisticsTable(context); + } else if (tableName.equals(StatsConstants.FULL_STATISTICS_TABLE_NAME)) { + return createFullStatisticsTable(context); + } else if (tableName.equals(StatsConstants.HISTOGRAM_STATISTICS_TABLE_NAME)) { + return createHistogramStatisticsTable(context); + } else if (tableName.equals(StatsConstants.EXTERNAL_FULL_STATISTICS_TABLE_NAME)) { + return createExternalFullStatisticsTable(context); + } else if (tableName.equals(StatsConstants.EXTERNAL_HISTOGRAM_STATISTICS_TABLE_NAME)) { + return createExternalHistogramStatisticsTable(context); + } else { + throw new StarRocksPlannerException("Error table name " + tableName, ErrorType.INTERNAL_ERROR); + } } } private void refreshStatisticsTable(String tableName) { - while (checkTableExist(tableName) && !checkReplicateNormal(tableName)) { - LOG.info("statistics table " + tableName + " replicate is not normal, will drop table and rebuild"); - if (dropTable(tableName)) { - break; - } - LOG.warn("drop statistics table " + tableName + " failed"); - trySleep(10000); - } - while (!checkTableExist(tableName)) { if (createTable(tableName)) { break; @@ -381,6 +313,9 @@ private void refreshStatisticsTable(String tableName) { LOG.warn("create statistics table " + tableName + " failed"); trySleep(10000); } + if (checkTableExist(tableName)) { + StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName); + } } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index 9d74396d2739dc..6d928d698c5baa 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -50,6 +50,7 @@ import com.starrocks.catalog.Table; import com.starrocks.catalog.Tablet; import com.starrocks.common.AnalysisException; +import com.starrocks.common.Config; import com.starrocks.common.DdlException; import com.starrocks.common.FeConstants; import com.starrocks.common.Pair; @@ -789,6 +790,17 @@ public int getAliveBackendNumber() { return getBackendIds(true).size(); } + public int getRetainedBackendNumber() { + return getRetainedBackends().size(); + } + + public int getSystemTableExpectedReplicationNum() { + if (RunMode.isSharedDataMode()) { + return 1; + } + return Integer.max(1, Integer.min(Config.default_replication_num, getRetainedBackendNumber())); + } + public int getTotalBackendNumber() { return idToBackendRef.size(); } @@ -910,12 +922,22 @@ public List getBackends() { return Lists.newArrayList(idToBackendRef.values()); } + /** + * Available: not decommissioned and alive + */ public List getAvailableBackends() { return getBackends().stream() .filter(ComputeNode::isAvailable) .collect(Collectors.toList()); } + /** + * Retained: not decommissioned, whatever alive or not + */ + public List getRetainedBackends() { + return getBackends().stream().filter(x -> !x.isDecommissioned()).collect(Collectors.toList()); + } + public List getComputeNodes() { return Lists.newArrayList(idToComputeNodeRef.values()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java b/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java index abb502aae3147e..8e97549880e2fc 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java @@ -46,6 +46,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; public class FileListRepoTest { @@ -228,28 +229,29 @@ public void executeDDL(String sql) { creator.run(); Assert.assertTrue(creator.isDatabaseExists()); Assert.assertFalse(creator.isTableExists()); - Assert.assertFalse(creator.isTableCorrected()); // create with 1 replica new MockUp() { @Mock - public int getTotalBackendNumber() { + public int getSystemTableExpectedReplicationNum() { return 1; } }; + AtomicInteger changed = new AtomicInteger(0); new MockUp() { @Mock public void executeDDL(String sql) { + changed.addAndGet(1); } }; creator.run(); Assert.assertTrue(creator.isTableExists()); - Assert.assertFalse(creator.isTableCorrected()); + Assert.assertEquals(1, changed.get()); // be corrected to 3 replicas new MockUp() { @Mock - public int getTotalBackendNumber() { + public int getSystemTableExpectedReplicationNum() { return 3; } }; @@ -257,7 +259,7 @@ public int getTotalBackendNumber() { creator.run(); Assert.assertTrue(creator.isDatabaseExists()); Assert.assertTrue(creator.isTableExists()); - Assert.assertTrue(creator.isTableCorrected()); + Assert.assertEquals(2, changed.get()); } @Test diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/history/TaskRunHistoryTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/history/TaskRunHistoryTest.java index 2d8b8bec2b1daa..c639717e1d8a50 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/history/TaskRunHistoryTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/history/TaskRunHistoryTest.java @@ -154,7 +154,6 @@ public void testKeeper(@Mocked RepoExecutor repo) { assertEquals(StatsConstants.STATISTICS_DB_NAME, keeper.getDatabaseName()); assertEquals(TaskRunHistoryTable.TABLE_NAME, keeper.getTableName()); assertEquals(TaskRunHistoryTable.CREATE_TABLE, keeper.getCreateTableSql()); - assertEquals(TaskRunHistoryTable.TABLE_REPLICAS, keeper.getTableReplicas()); // database not exists new Expectations() { diff --git a/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticUtilsTest.java b/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticUtilsTest.java new file mode 100644 index 00000000000000..64a0aacbb594cd --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/statistic/StatisticUtilsTest.java @@ -0,0 +1,80 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.statistic; + +import com.starrocks.common.Config; +import com.starrocks.sql.plan.PlanTestBase; +import com.starrocks.system.SystemInfoService; +import com.starrocks.utframe.UtFrameUtils; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class StatisticUtilsTest extends PlanTestBase { + + @BeforeAll + public static void beforeClass() throws Exception { + PlanTestBase.beforeClass(); + UtFrameUtils.createMinStarRocksCluster(); + if (!starRocksAssert.databaseExist("_statistics_")) { + StatisticsMetaManager m = new StatisticsMetaManager(); + m.createStatisticsTablesForTest(); + } + UtFrameUtils.addMockBackend(123); + UtFrameUtils.addMockBackend(124); + } + + @Test + void alterSystemTableReplicationNumIfNecessary() { + // 1. Has sufficient backends + new MockUp() { + @Mock + public int getRetainedBackendNumber() { + return 100; + } + }; + final String tableName = "column_statistics"; + Assert.assertTrue(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + Assert.assertFalse(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + Assert.assertEquals("3", + starRocksAssert.getTable(StatsConstants.STATISTICS_DB_NAME, tableName).getProperties().get( + "replication_num")); + + // 2. change default_replication_num + Config.default_replication_num = 1; + Assert.assertTrue(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + Assert.assertFalse(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + Assert.assertEquals("1", + starRocksAssert.getTable(StatsConstants.STATISTICS_DB_NAME, tableName).getProperties().get( + "replication_num")); + Config.default_replication_num = 3; + Assert.assertTrue(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + + // 3. Has no sufficient backends + new MockUp() { + @Mock + public int getRetainedBackendNumber() { + return 1; + } + }; + Assert.assertTrue(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + Assert.assertFalse(StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName)); + Assert.assertEquals("1", + starRocksAssert.getTable(StatsConstants.STATISTICS_DB_NAME, tableName).getProperties().get( + "replication_num")); + } +} \ No newline at end of file