Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SR-4322] Auto statistic don't work if table has not been update #94

Merged
merged 1 commit into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SR-4322] Auto statistic don't work if table has not been update
  • Loading branch information
Seaven committed Sep 8, 2021
commit 4e12717cd1bf5cae2af3d48784c456f884abddaf
2 changes: 1 addition & 1 deletion bin/start_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ while true; do
--daemon) RUN_DAEMON=1 ; shift ;;
--helper) HELPER=$2 ; shift 2 ;;
--) shift ; break ;;
*) ehco "Internal error" ; exit 1 ;;
*) echo "Internal error" ; exit 1 ;;
esac
done

Expand Down
6 changes: 0 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1235,12 +1235,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long statistic_update_interval_sec = 24 * 60 * 60;

/**
* The column statistic expire interval, default 5 day
*/
@ConfField(mutable = true, masterOnly = true)
public static long statistic_expire_sec = 5 * 24 * 60 * 60;

/**
* The row number of simple collect, default 20w rows
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private Relation analyzeAnalyzeStmt(AnalyzeStmt node) {
if (StatisticUtils.statisticDatabaseBlackListCheck(node.getTableName().getDb())) {
throw new SemanticException("Forbidden collect database: %s", node.getTableName().getDb());
}
if (!(analyzeTable instanceof OlapTable)) {
if (analyzeTable.getType() != Table.TableType.OLAP) {
throw new SemanticException("Table '%s' is not a OLAP table", analyzeTable.getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@

public class AnalyzeJob implements Writable {
public static final String PROP_UPDATE_INTERVAL_SEC_KEY = "update_interval_sec";
public static final String PROP_EXPIRE_SEC_KEY = "expire_sec";
public static final String PROP_SAMPLE_COLLECT_ROWS_KEY = "sample_collect_rows";

public static final List<String> NUMBER_PROP_KEY_LIST = ImmutableList.<String>builder()
.add(PROP_UPDATE_INTERVAL_SEC_KEY)
.add(PROP_EXPIRE_SEC_KEY)
.add(PROP_SAMPLE_COLLECT_ROWS_KEY).build();

public static final long DEFAULT_ALL_ID = -1;
Expand Down Expand Up @@ -141,11 +139,6 @@ public long getUpdateIntervalSec() {
.getOrDefault(PROP_UPDATE_INTERVAL_SEC_KEY, String.valueOf(Config.statistic_update_interval_sec)));
}

public long getExpireSec() {
return Long
.parseLong(properties.getOrDefault(PROP_EXPIRE_SEC_KEY, String.valueOf(Config.statistic_expire_sec)));
}

public long getSampleCollectRows() {
return Long.parseLong(properties
.getOrDefault(PROP_SAMPLE_COLLECT_ROWS_KEY, String.valueOf(Config.statistic_sample_collect_rows)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;
Expand All @@ -16,6 +19,8 @@
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -59,30 +64,44 @@ public List<AnalyzeJob> getAllAnalyzeJobList() {
public void expireAnalyzeJob() {
List<AnalyzeJob> expireList = Lists.newArrayList();

LocalDateTime now = LocalDateTime.now();
for (AnalyzeJob job : analyzeJobMap.values()) {
if (AnalyzeJob.DEFAULT_ALL_ID != job.getDbId()) {
// check db/table
Database db = Catalog.getCurrentCatalog().getDb(job.getDbId());
if (null == db) {
expireList.add(job);
continue;
}

if (AnalyzeJob.DEFAULT_ALL_ID != job.getTableId()) {
if (null == db.getTable(job.getTableId())) {
expireList.add(job);
continue;
}
}
if (Constants.ScheduleStatus.FINISH != job.getStatus()) {
continue;
}

if (Constants.ScheduleStatus.FINISH != job.getStatus()) {
if (AnalyzeJob.DEFAULT_ALL_ID == job.getDbId() || AnalyzeJob.DEFAULT_ALL_ID == job.getTableId()) {
// finish job must be schedule once job, must contains db and table
LOG.warn("expire analyze job check failed, contain default id job: " + job.getId());
continue;
}

// Job will keep two collection cycle
LocalDateTime expireTimePoint = LocalDateTime.now().minusSeconds(job.getExpireSec());
if (job.getWorkTime().isBefore(expireTimePoint)) {
// check db/table
Database db = Catalog.getCurrentCatalog().getDb(job.getDbId());
if (null == db) {
expireList.add(job);
continue;
}

Table table = db.getTable(job.getTableId());
if (null == table) {
expireList.add(job);
continue;
}

if (table.getType() != Table.TableType.OLAP) {
expireList.add(job);
continue;
}

long maxTime = ((OlapTable) table).getPartitions().stream().map(Partition::getVisibleVersionTime)
.max(Long::compareTo).orElse(0L);

LocalDateTime updateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(maxTime), Clock.systemDefaultZone().getZone());

// keep show 1 day
if (updateTime.plusDays(1).isBefore(now)) {
expireList.add(job);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.util.MasterDaemon;
Expand All @@ -16,6 +18,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -30,7 +34,7 @@ public class StatisticAutoCollector extends MasterDaemon {
private static final StatisticExecutor statisticExecutor = new StatisticExecutor();

public StatisticAutoCollector() {
super("statistic auto collector", Config.statistic_collect_interval_sec * 1000);
super("AutoStatistic", Config.statistic_collect_interval_sec * 1000);
}

private static class TableCollectJob {
Expand Down Expand Up @@ -73,7 +77,7 @@ protected void runAfterCatalogReady() {

collectStatistics(allJobs);

expireStatistic(allJobs);
expireStatistic();
}

private void initDefaultJob() {
Expand All @@ -91,6 +95,7 @@ private void initDefaultJob() {
analyzeJob.setScheduleType(ScheduleType.SCHEDULE);
analyzeJob.setType(AnalyzeType.SAMPLE);
analyzeJob.setStatus(ScheduleStatus.PENDING);
analyzeJob.setWorkTime(LocalDateTime.MIN);

Catalog.getCurrentAnalyzeMgr().addAnalyzeJob(analyzeJob);
}
Expand Down Expand Up @@ -125,20 +130,21 @@ private void collectStatistics(List<TableCollectJob> allJobs) {
Catalog.getCurrentAnalyzeMgr().updateAnalyzeJobWithoutLog(analyzeJob);
for (TableCollectJob tcj : entry.getValue()) {
try {
LOG.info("Statistic collect work once on job: {}, type: {}, db: {}, table: {}",
LOG.info("Statistic collect work job: {}, type: {}, db: {}, table: {}",
analyzeJob.getId(), analyzeJob.getType(), tcj.db.getFullName(), tcj.table.getName());
tcj.tryCollect();

// sleep 1s per column
// @TODO: It's necessary?
// trySleep(tcj.columns.size() * 1000);
Catalog.getCurrentStatisticStorage().expireColumnStatistics(tcj.table, tcj.columns);
} catch (Exception e) {
LOG.warn("Statistic collect work once on job: {}, type: {}, db: {}, table: {}. throw exception.",
LOG.warn("Statistic collect work job: {}, type: {}, db: {}, table: {}. throw exception.",
analyzeJob.getId(), analyzeJob.getType(), tcj.db.getFullName(), tcj.table.getName(), e);
String error = analyzeJob.getReason() + "\n" + tcj.db.getFullName() + "." + tcj.table.getName() +
": " + e.getMessage();
analyzeJob.setReason(error);

if (analyzeJob.getReason().length() < 40) {
String error =
analyzeJob.getReason() + "\n" + tcj.db.getFullName() + "." + tcj.table.getName() +
": " + e.getMessage();
analyzeJob.setReason(error);
}
}
}

Expand All @@ -155,6 +161,7 @@ private void collectStatistics(List<TableCollectJob> allJobs) {

private List<TableCollectJob> generateAllJobs() {
List<AnalyzeJob> allAnalyzeJobs = Catalog.getCurrentAnalyzeMgr().getAllAnalyzeJobList();
// The jobs need to be sorted in order of execution to avoid duplicate collections
allAnalyzeJobs.sort(Comparator.comparing(AnalyzeJob::getId));

Map<Long, List<TableCollectJob>> allTableJobMap = Maps.newHashMap();
Expand Down Expand Up @@ -203,12 +210,8 @@ private List<TableCollectJob> generateAllJobs() {
continue;
}

Table table = db.getTable(analyzeJob.getTableId());
if (table == null || !Table.TableType.OLAP.equals(table.getType())) {
continue;
}

createJobs(allTableJobMap, analyzeJob, db, table, analyzeJob.getColumns());
createTableJobs(allTableJobMap, analyzeJob, db, db.getTable(analyzeJob.getTableId()),
analyzeJob.getColumns());
}
}

Expand All @@ -223,10 +226,25 @@ private void createTableJobs(Map<Long, List<TableCollectJob>> tableJobs, Analyze
return;
}

List<String> columns =
table.getFullSchema().stream().filter(d -> !d.isAggregated()).map(Column::getName)
.collect(Collectors.toList());
createJobs(tableJobs, job, db, table, columns);
List<String> columns = table.getFullSchema().stream().filter(d -> !d.isAggregated()).map(Column::getName)
.collect(Collectors.toList());
createTableJobs(tableJobs, job, db, table, columns);
}

private void createTableJobs(Map<Long, List<TableCollectJob>> tableJobs, AnalyzeJob job,
Database db, Table table, List<String> columns) {
// check table has update
LocalDateTime updateTime = getTableLastUpdateTime(table);

// 1. If job is schedule and the table has update, we need re-collect data
// 2. If job is once and is happened after the table update, we need add it to avoid schedule-job cover data
if ((ScheduleType.SCHEDULE.equals(job.getScheduleType()) && job.getWorkTime().isBefore(updateTime))
|| (ScheduleType.ONCE.equals(job.getScheduleType()) && job.getWorkTime().isAfter(updateTime))) {
createJobs(tableJobs, job, db, table, columns);
} else {
LOG.debug("Skip collect on table: " + table.getName() + ", updateTime: " + updateTime +
", JobId: " + job.getId() + ", lastCollectTime: " + job.getWorkTime());
}
}

private void createJobs(Map<Long, List<TableCollectJob>> result,
Expand Down Expand Up @@ -254,28 +272,33 @@ private void createJobs(Map<Long, List<TableCollectJob>> result,
result.get(table.getId()).add(tableJob);
}

private void expireStatistic(List<TableCollectJob> allJobs) {
try {
if (allJobs.isEmpty()) {
return;
private void expireStatistic() {
List<Long> dbIds = Catalog.getCurrentCatalog().getDbIds();
List<Long> tables = Lists.newArrayList();
for (Long dbId : dbIds) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (null == db || StatisticUtils.statisticDatabaseBlackListCheck(db.getFullName())) {
continue;
}

LocalDateTime expireTime =
allJobs.stream().map(tcj -> LocalDateTime.now().minusSeconds(tcj.job.getExpireSec()))
.min(LocalDateTime::compareTo).orElse(LocalDateTime.MIN);

List<String> tableIds = statisticExecutor.queryExpireTableSync(expireTime);

if (!tableIds.isEmpty()) {
LOG.info("Statistic expire tableIds: {}, expireTime: {}", tableIds, expireTime.toString());
db.getTables().stream().map(Table::getId).forEach(tables::add);
}
try {
List<String> expireTables = statisticExecutor.queryExpireTableSync(tables);

for (int i = 0; i < tableIds.size() && i < 10; i++) {
statisticExecutor.expireStatisticSync(tableIds.get(i));
}
if (expireTables.isEmpty()) {
return;
}
LOG.info("Statistic expire tableIds: {}", expireTables);
statisticExecutor.expireStatisticSync(expireTables);
} catch (Exception e) {
LOG.warn("expire statistic failed.", e);
}
}

private LocalDateTime getTableLastUpdateTime(Table table) {
long maxTime = ((OlapTable) table).getPartitions().stream().map(Partition::getVisibleVersionTime)
.max(Long::compareTo).orElse(0L);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(maxTime), Clock.systemDefaultZone().getZone());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.starrocks.sql.plan.PlanFragmentBuilder;
import com.starrocks.thrift.TResultBatch;
import com.starrocks.thrift.TStatisticData;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
Expand All @@ -53,7 +54,6 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -187,7 +187,7 @@ public void collectStatisticSync(Long dbId, Long tableId, List<String> columnNam
sql = buildFullInsertSQL(dbId, tableId, list);
}

LOG.info("Collect statistic SQL: {}", sql);
LOG.debug("Collect statistic SQL: {}", sql);

ConnectContext context = StatisticUtils.buildConnectContext();
StatementBase parsedStmt = parseSQL(sql, context);
Expand All @@ -200,9 +200,9 @@ public void collectStatisticSync(Long dbId, Long tableId, List<String> columnNam
}
}

public void expireStatisticSync(String tableId) {
public void expireStatisticSync(List<String> tableIds) {
StringBuilder sql = new StringBuilder(DELETE_TEMPLATE);
sql.append(" table_id = ").append(tableId);
sql.append(" table_id IN (").append(StringUtils.join(tableIds, ",")).append(")");
LOG.debug("Expire statistic SQL: {}", sql);

ConnectContext context = StatisticUtils.buildConnectContext();
Expand All @@ -216,9 +216,9 @@ public void expireStatisticSync(String tableId) {
}
}

public List<String> queryExpireTableSync(LocalDateTime expireTime) throws Exception {
public List<String> queryExpireTableSync(List<Long> tableIds) throws Exception {
StringBuilder sql = new StringBuilder(SELECT_EXPIRE_TABLE_TEMPLATE);
sql.append(" AND update_time < '").append(expireTime.format(DEFAULT_UPDATE_TIME_FORMATTER)).append("'");
sql.append(" AND table_id NOT IN (").append(StringUtils.join(tableIds, ",")).append(")");
LOG.debug("Query expire statistic SQL: {}", sql);

Map<String, Database> dbs = Maps.newHashMap();
Expand All @@ -238,14 +238,14 @@ public List<String> queryExpireTableSync(LocalDateTime expireTime) throws Except

CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();

List<String> tableIds = Lists.newArrayList();
List<String> result = Lists.newArrayList();
for (TResultBatch batch : sqlResult) {
for (ByteBuffer byteBuffer : batch.getRows()) {
tableIds.add(decoder.decode(byteBuffer).toString().substring(1));
result.add(decoder.decode(byteBuffer).toString().substring(1));
}
}

return tableIds;
return result;
} catch (Exception e) {
LOG.warn("Execute statistic table query fail.", e);
throw e;
Expand Down