Skip to content

Commit

Permalink
[Enhancement] auto change replication_num of system tables (#51799)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
  • Loading branch information
murphyatwork authored Oct 14, 2024
1 parent 8cc2ae2 commit 0c0ea45
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
package com.starrocks.load.loadv2;

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -68,7 +68,7 @@ public class LoadsHistorySyncer extends FrontendDaemon {
"properties('replication_num' = '%d') ";

private static final String CORRECT_LOADS_HISTORY_REPLICATION_NUM =
"ALTER TABLE %s SET ('default.replication_num'='3')";
"ALTER TABLE %s SET ('default.replication_num'='%d')";

private static final String LOADS_HISTORY_SYNC =
"INSERT INTO %s " +
Expand All @@ -78,10 +78,9 @@ public class LoadsHistorySyncer extends FrontendDaemon {
"AND load_finish_time > ( " +
"SELECT COALESCE(MAX(load_finish_time), '0001-01-01 00:00:00') " +
"FROM %s);";

private boolean databaseExists = false;
private boolean tableExists = false;
private boolean tableCorrected = false;

public LoadsHistorySyncer() {
super("Load history syncer", Config.loads_history_sync_interval_second * 1000L);
Expand All @@ -97,23 +96,7 @@ public static void createTable() throws UserException {
}

public static boolean correctTable() {
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState().getLocalMetastore()
.mayGetTable(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME)
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);
if (numBackends < 3) {
LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends);
return false;
}
if (replica < 3) {
String sql = SQLBuilder.buildAlterTableSql();
RepoExecutor.getInstance().executeDDL(sql);
} else {
LOG.info("table {} already has {} replicas, no need to alter replication_num",
LOADS_HISTORY_TABLE_NAME, replica);
}
return true;
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(LOADS_HISTORY_TABLE_NAME);
}

public void checkMeta() throws UserException {
Expand All @@ -129,10 +112,8 @@ public void checkMeta() throws UserException {
LOG.info("table created: " + LOADS_HISTORY_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + LOADS_HISTORY_TABLE_NAME);
tableCorrected = true;
}

correctTable();

if (getInterval() != Config.loads_history_sync_interval_second * 1000L) {
setInterval(Config.loads_history_sync_interval_second * 1000L);
Expand Down Expand Up @@ -171,9 +152,9 @@ public static String buildCreateTableSql() throws UserException {
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildAlterTableSql() {
public static String buildAlterTableSql(int replica) {
return String.format(CORRECT_LOADS_HISTORY_REPLICATION_NUM,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME));
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildSyncSql() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.load.pipe.PipeFileRecord;
import com.starrocks.statistic.StatsConstants;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class FileListTableRepo extends FileListRepo {
"properties('replication_num' = '%d') ";

protected static final String CORRECT_FILE_LIST_REPLICATION_NUM =
"ALTER TABLE %s SET ('replication_num'='3')";
"ALTER TABLE %s SET ('replication_num'='%d')";

protected static final String ALL_COLUMNS =
"`pipe_id`, `file_name`, `file_version`, `file_size`, `state`, `last_modified`, `staged_time`," +
Expand Down Expand Up @@ -155,15 +154,14 @@ public void destroy() {
*/
static class SQLBuilder {

public static String buildCreateTableSql() throws UserException {
int replica = AutoInferUtil.calDefaultReplicationNum();
public static String buildCreateTableSql(int replicationNum) throws UserException {
return String.format(FILE_LIST_TABLE_CREATE,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replica);
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}

public static String buildAlterTableSql() {
public static String buildAlterTableSql(int replicationNum) {
return String.format(CORRECT_FILE_LIST_REPLICATION_NUM,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME));
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package com.starrocks.load.pipe.filelist;

import com.starrocks.catalog.OlapTable;
import com.starrocks.common.UserException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -30,7 +30,6 @@ public class RepoCreator {

private static boolean databaseExists = false;
private static boolean tableExists = false;
private static boolean tableCorrected = false;

public static RepoCreator getInstance() {
return INSTANCE;
Expand All @@ -50,10 +49,7 @@ public void run() {
LOG.info("table created: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableCorrected = true;
}
correctTable();
} catch (Exception e) {
LOG.error("error happens in RepoCreator: ", e);
}
Expand All @@ -64,28 +60,14 @@ public boolean checkDatabaseExists() {
}

public static void createTable() throws UserException {
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql();
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql(expectedReplicationNum);
RepoExecutor.getInstance().executeDDL(sql);
}

public static boolean correctTable() {
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState()
.getLocalMetastore().mayGetTable(FileListTableRepo.FILE_LIST_DB_NAME, FileListTableRepo.FILE_LIST_TABLE_NAME)
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);
if (numBackends < 3) {
LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends);
return false;
}
if (replica < 3) {
String sql = FileListTableRepo.SQLBuilder.buildAlterTableSql();
RepoExecutor.getInstance().executeDDL(sql);
} else {
LOG.info("table {} already has {} replicas, no need to alter replication_num",
FileListTableRepo.FILE_LIST_FULL_NAME, replica);
}
return true;
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(FileListTableRepo.FILE_LIST_TABLE_NAME);
}

public boolean isDatabaseExists() {
Expand All @@ -95,8 +77,4 @@ public boolean isDatabaseExists() {
public boolean isTableExists() {
return tableExists;
}

public boolean isTableCorrected() {
return tableCorrected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class TableKeeper {
private final String databaseName;
private final String tableName;
private final String createTableSql;
private final int tableReplicas;

private boolean databaseExisted = false;
private boolean tableExisted = false;
Expand All @@ -52,12 +51,10 @@ public class TableKeeper {
public TableKeeper(String database,
String table,
String createTable,
int expectedReplicas,
Supplier<Integer> ttlSupplier) {
this.databaseName = database;
this.tableName = table;
this.createTableSql = createTable;
this.tableReplicas = expectedReplicas;
this.ttlSupplier = ttlSupplier;
}

Expand Down Expand Up @@ -100,22 +97,20 @@ public void createTable() throws UserException {
}

public void correctTable() {
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
int replica = GlobalStateMgr.getCurrentState()
.getLocalMetastore().mayGetTable(databaseName, tableName)
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);
if (numBackends < tableReplicas) {
LOG.info("not enough backends in the cluster, expected {} but got {}",
tableReplicas, numBackends);
return;
}
if (replica < tableReplicas) {
String sql = alterTableReplicas();

if (replica != expectedReplicationNum) {
String sql = alterTableReplicas(expectedReplicationNum);
if (StringUtils.isNotEmpty(sql)) {
RepoExecutor.getInstance().executeDDL(sql);
}
LOG.info("changed replication_number of table {} to {}", tableName, replica);
LOG.info("changed replication_number of table {} from {} to {}",
tableName, replica, expectedReplicationNum);
}
}

Expand Down Expand Up @@ -146,21 +141,21 @@ private Optional<OlapTable> mayGetTable() {
.flatMap(x -> Optional.of((OlapTable) x));
}

private String alterTableReplicas() {
private String alterTableReplicas(int replicationNum) {
Optional<OlapTable> table = mayGetTable();
if (table.isEmpty()) {
return "";
}
PartitionInfo partitionInfo = table.get().getPartitionInfo();
if (partitionInfo.isRangePartition()) {
String sql1 = String.format("ALTER TABLE %s.%s MODIFY PARTITION(*) SET ('replication_num'='%d');",
databaseName, tableName, tableReplicas);
databaseName, tableName, replicationNum);
String sql2 = String.format("ALTER TABLE %s.%s SET ('default.replication_num'='%d');",
databaseName, tableName, tableReplicas);
databaseName, tableName, replicationNum);
return sql1 + sql2;
} else {
return String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')",
databaseName, tableName, tableReplicas);
databaseName, tableName, replicationNum);
}
}

Expand All @@ -180,10 +175,6 @@ public String getCreateTableSql() {
return createTableSql;
}

public int getTableReplicas() {
return tableReplicas;
}

public boolean isDatabaseExisted() {
return databaseExisted;
}
Expand All @@ -200,14 +191,6 @@ public void setDatabaseExisted(boolean databaseExisted) {
this.databaseExisted = databaseExisted;
}

public void setTableExisted(boolean tableExisted) {
this.tableExisted = tableExisted;
}

public void setTableCorrected(boolean tableCorrected) {
this.tableCorrected = tableCorrected;
}

public static TableKeeperDaemon startDaemon() {
TableKeeperDaemon daemon = TableKeeperDaemon.getInstance();
daemon.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class TaskRunHistoryTable {
public static final String DATABASE_NAME = StatsConstants.STATISTICS_DB_NAME;
public static final String TABLE_NAME = "task_run_history";
public static final String TABLE_FULL_NAME = DATABASE_NAME + "." + TABLE_NAME;
public static final int TABLE_REPLICAS = 3;
public static final String CREATE_TABLE =
String.format("CREATE TABLE IF NOT EXISTS %s (" +
// identifiers
Expand Down Expand Up @@ -93,7 +92,7 @@ public class TaskRunHistoryTable {
"SELECT history_content_json " + "FROM " + TABLE_FULL_NAME + " WHERE ";

private static final TableKeeper KEEPER =
new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE, TABLE_REPLICAS,
new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE,
() -> Math.max(1, Config.task_runs_ttl_second / 3600 / 24));

public static TableKeeper createKeeper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.connector.ConnectorPartitionTraits;
import com.starrocks.load.EtlStatus;
import com.starrocks.load.loadv2.LoadJobFinalOperation;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.load.streamload.StreamLoadTxnCommitAttachment;
import com.starrocks.privilege.PrivilegeBuiltinConstants;
import com.starrocks.qe.ConnectContext;
Expand All @@ -61,6 +62,7 @@
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TxnCommitAttachment;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -475,6 +477,32 @@ public static void dropStatisticsAfterDropTable(Table table) {
GlobalStateMgr.getCurrentState().getStatisticStorage().expireConnectorTableColumnStatistics(table, columns);
}

/**
* Change the replication_num of system table according to cluster status
* 1. When scale-out to greater than 3 nodes, change the replication_num to 3
* 3. When scale-in to less than 3 node, change it to retainedBackendNum
*/
public static boolean alterSystemTableReplicationNumIfNecessary(String tableName) {
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
int replica = GlobalStateMgr.getCurrentState()
.getLocalMetastore().mayGetTable(StatsConstants.STATISTICS_DB_NAME, tableName)
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);

if (replica != expectedReplicationNum) {
String sql = String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')",
StatsConstants.STATISTICS_DB_NAME, tableName, expectedReplicationNum);
if (StringUtils.isNotEmpty(sql)) {
RepoExecutor.getInstance().executeDDL(sql);
}
LOG.info("changed replication_number of table {} from {} to {}",
tableName, replica, expectedReplicationNum);
return true;
}
return false;
}

// only support collect statistics for slotRef and subfield expr
public static String getColumnName(Table table, Expr column) {
String colName;
Expand Down
Loading

0 comments on commit 0c0ea45

Please sign in to comment.