Skip to content

Commit

Permalink
[Enhancement] auto change replication_num of system tables (#51799)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
(cherry picked from commit 0c0ea45)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java
#	fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoCreator.java
#	fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java
#	fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java
#	fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java
#	fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsMetaManager.java
#	fe/fe-core/src/test/java/com/starrocks/scheduler/history/TaskRunHistoryTest.java
  • Loading branch information
murphyatwork authored and mergify[bot] committed Oct 14, 2024
1 parent 4099470 commit 26207a5
Show file tree
Hide file tree
Showing 11 changed files with 1,128 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load.loadv2;

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
*/
public class LoadsHistorySyncer extends FrontendDaemon {
private static final Logger LOG = LogManager.getLogger(LoadsHistorySyncer.class);

public static final String LOADS_HISTORY_DB_NAME = "_statistics_";
public static final String LOADS_HISTORY_TABLE_NAME = "loads_history";

private static final String LOADS_HISTORY_TABLE_CREATE =
"CREATE TABLE IF NOT EXISTS %s (" +
"id bigint, " +
"label varchar(2048), " +
"profile_id varchar(2048), " +
"db_name varchar(2048), " +
"table_name varchar(2048), " +
"user varchar(2048), " +
"warehouse varchar(2048), " +
"state varchar(2048), " +
"progress varchar(2048), " +
"type varchar(2048), " +
"priority varchar(2048), " +
"scan_rows bigint, " +
"scan_bytes bigint, " +
"filtered_rows bigint, " +
"unselected_rows bigint, " +
"sink_rows bigint, " +
"runtime_details json, " +
"create_time datetime, " +
"load_start_time datetime, " +
"load_commit_time datetime, " +
"load_finish_time datetime not null, " +
"properties json, " +
"error_msg varchar(2048), " +
"tracking_sql varchar(2048), " +
"rejected_record_path varchar(2048), " +
"job_id bigint " +
") " +
"PARTITION BY date_trunc('DAY', load_finish_time) " +
"DISTRIBUTED BY HASH(label) BUCKETS 3 " +
"properties('replication_num' = '%d') ";

private static final String CORRECT_LOADS_HISTORY_REPLICATION_NUM =
"ALTER TABLE %s SET ('default.replication_num'='%d')";

private static final String LOADS_HISTORY_SYNC =
"INSERT INTO %s " +
"SELECT * FROM information_schema.loads " +
"WHERE load_finish_time IS NOT NULL " +
"AND load_finish_time < NOW() - INTERVAL 1 MINUTE " +
"AND load_finish_time > ( " +
"SELECT COALESCE(MAX(load_finish_time), '0001-01-01 00:00:00') " +
"FROM %s);";

private boolean databaseExists = false;
private boolean tableExists = false;

public LoadsHistorySyncer() {
super("Load history syncer", Config.loads_history_sync_interval_second * 1000L);
}

public boolean checkDatabaseExists() {
return GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(LOADS_HISTORY_DB_NAME) != null;
}

public static void createTable() throws UserException {
String sql = SQLBuilder.buildCreateTableSql();
RepoExecutor.getInstance().executeDDL(sql);
}

public static boolean correctTable() {
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(LOADS_HISTORY_TABLE_NAME);
}

public void checkMeta() throws UserException {
if (!databaseExists) {
databaseExists = checkDatabaseExists();
if (!databaseExists) {
LOG.warn("database not exists: " + LOADS_HISTORY_DB_NAME);
return;
}
}
if (!tableExists) {
createTable();
LOG.info("table created: " + LOADS_HISTORY_TABLE_NAME);
tableExists = true;
}

correctTable();

if (getInterval() != Config.loads_history_sync_interval_second * 1000L) {
setInterval(Config.loads_history_sync_interval_second * 1000L);
}
}

public void syncData() {
try {
RepoExecutor.getInstance().executeDML(SQLBuilder.buildSyncSql());
} catch (Exception e) {
LOG.error("Failed to sync loads history", e);
}
}

@Override
protected void runAfterCatalogReady() {
if (FeConstants.runningUnitTest) {
return;
}
try {
checkMeta();
syncData();
} catch (Throwable e) {
LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e);
}
}

/**
* Generate SQL for operations
*/
static class SQLBuilder {

public static String buildCreateTableSql() throws UserException {
int replica = AutoInferUtil.calDefaultReplicationNum();
return String.format(LOADS_HISTORY_TABLE_CREATE,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildAlterTableSql(int replica) {
return String.format(CORRECT_LOADS_HISTORY_REPLICATION_NUM,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildSyncSql() {
return String.format(LOADS_HISTORY_SYNC,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME),
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.load.pipe.PipeFileRecord;
import com.starrocks.statistic.StatsConstants;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class FileListTableRepo extends FileListRepo {
"properties('replication_num' = '%d') ";

protected static final String CORRECT_FILE_LIST_REPLICATION_NUM =
"ALTER TABLE %s SET ('replication_num'='3')";
"ALTER TABLE %s SET ('replication_num'='%d')";

protected static final String ALL_COLUMNS =
"`pipe_id`, `file_name`, `file_version`, `file_size`, `state`, `last_modified`, `staged_time`," +
Expand Down Expand Up @@ -155,15 +154,14 @@ public void destroy() {
*/
static class SQLBuilder {

public static String buildCreateTableSql() throws UserException {
int replica = AutoInferUtil.calDefaultReplicationNum();
public static String buildCreateTableSql(int replicationNum) throws UserException {
return String.format(FILE_LIST_TABLE_CREATE,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replica);
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}

public static String buildAlterTableSql() {
public static String buildAlterTableSql(int replicationNum) {
return String.format(CORRECT_FILE_LIST_REPLICATION_NUM,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME));
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package com.starrocks.load.pipe.filelist;

import com.starrocks.catalog.OlapTable;
import com.starrocks.common.UserException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -30,7 +30,6 @@ public class RepoCreator {

private static boolean databaseExists = false;
private static boolean tableExists = false;
private static boolean tableCorrected = false;

public static RepoCreator getInstance() {
return INSTANCE;
Expand All @@ -50,10 +49,7 @@ public void run() {
LOG.info("table created: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableCorrected = true;
}
correctTable();
} catch (Exception e) {
LOG.error("error happens in RepoCreator: ", e);
}
Expand All @@ -64,11 +60,14 @@ public boolean checkDatabaseExists() {
}

public static void createTable() throws UserException {
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql();
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql(expectedReplicationNum);
RepoExecutor.getInstance().executeDDL(sql);
}

public static boolean correctTable() {
<<<<<<< HEAD
int numBackends = GlobalStateMgr.getCurrentSystemInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState()
.mayGetDb(FileListTableRepo.FILE_LIST_DB_NAME)
Expand All @@ -87,6 +86,9 @@ public static boolean correctTable() {
FileListTableRepo.FILE_LIST_FULL_NAME, replica);
}
return true;
=======
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(FileListTableRepo.FILE_LIST_TABLE_NAME);
>>>>>>> 0c0ea45ed1 ([Enhancement] auto change replication_num of system tables (#51799))
}

public boolean isDatabaseExists() {
Expand All @@ -96,8 +98,4 @@ public boolean isDatabaseExists() {
public boolean isTableExists() {
return tableExists;
}

public boolean isTableCorrected() {
return tableCorrected;
}
}
Loading

0 comments on commit 26207a5

Please sign in to comment.