Skip to content

Commit

Permalink
[Improvement](statistics)Improve stats sample strategy (apache#26435) (
Browse files Browse the repository at this point in the history
…apache#26890)

backport apache#26435
Improve the accuracy of sample stats collection. For non distribution columns, use
`n*d / (n - f1 + f1*n/N)`

where `f1` is the number of distinct values that occurred exactly once in our sample of n rows (from a total of N),
and `d` is the total number of distinct values in the sample.

For distribution columns, use `ndv(n) * fraction of tablets sampled` for NDV.

For very large tablet to sample, use limit to control the total lines to scan (for non key column only, because key column is sorted and will be inaccurate using limit).
  • Loading branch information
Jibing-Li authored Nov 14, 2023
1 parent b536915 commit 9069a3b
Show file tree
Hide file tree
Showing 18 changed files with 825 additions and 201 deletions.
11 changes: 8 additions & 3 deletions docs/en/docs/query-acceleration/statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Syntax:
```SQL
ANALYZE < TABLE | DATABASE table_name | db_name >
[ (column_name [, ...]) ]
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH SQL ] ]
[ PROPERTIES ("key" = "value", ...) ];
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] ];
```

Where:
Expand All @@ -64,7 +63,6 @@ Where:
- `column_name`: The specified target column. It must be an existing column in `table_name`. You can specify multiple column names separated by commas.
- `sync`: Collect statistics synchronously. Returns after collection. If not specified, it executes asynchronously and returns a JOB ID.
- `sample percent | rows`: Collect statistics with sampling. You can specify a sampling percentage or a number of sampling rows.
- `sql`: Execute SQL to collect statistics for partitioned columns in external tables. By default, partitioned column information is collected from metadata, which is efficient but may not be accurate in terms of row count and data size. Users can specify using SQL to collect accurate partitioned column information.

Here are some examples:

Expand All @@ -90,6 +88,13 @@ The collection jobs for statistics themselves consume a certain amount of system

If you are concerned about automatic collection jobs interfering with your business, you can specify a time frame for the automatic collection jobs to run during low business loads by setting the `full_auto_analyze_start_time` and `full_auto_analyze_end_time` parameters according to your needs. You can also completely disable this feature by setting the `enable_full_auto_analyze` parameter to `false`.

External catalogs do not participate in automatic collection by default. Because external catalogs often contain massive historical data, if they participate in automatic collection, it may occupy too many resources. You can turn on and off the automatic collection of external catalogs by setting the catalog's properties.

```sql
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='true'); // Turn on
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='false'); // Turn off
```

<br />


Expand Down
11 changes: 8 additions & 3 deletions docs/zh-CN/docs/query-acceleration/statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Doris支持用户通过提交ANALYZE语句来手动触发统计信息的收集
```SQL
ANALYZE < TABLE | DATABASE table_name | db_name >
[ (column_name [, ...]) ]
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH SQL ] ]
[ PROPERTIES ("key" = "value", ...) ];
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] ];
```

其中:
Expand All @@ -66,7 +65,6 @@ ANALYZE < TABLE | DATABASE table_name | db_name >
- column_name: 指定的目标列。必须是  `table_name`  中存在的列,多个列名称用逗号分隔。
- sync:同步收集统计信息。收集完后返回。若不指定则异步执行并返回JOB ID。
- sample percent | rows:抽样收集统计信息。可以指定抽样比例或者抽样行数。
- sql:执行sql来收集外表分区列统计信息。默认从元数据收集分区列信息,这样效率比较高但是行数和数据量大小可能不准。用户可以指定使用sql来收集,这样可以收集到准确的分区列信息。


下面是一些例子
Expand All @@ -93,6 +91,13 @@ ANALYZE TABLE lineitem WITH SAMPLE ROWS 100000;

如果担心自动收集作业对业务造成干扰,可结合自身需求通过设置参数`full_auto_analyze_start_time`和参数`full_auto_analyze_end_time`指定自动收集作业在业务负载较低的时间段执行。也可以通过设置参数`enable_full_auto_analyze``false`来彻底关闭本功能。

External catalog 默认不参与自动收集。因为 external catalog 往往包含海量历史数据,如果参与自动收集,可能占用过多资源。可以通过设置 catalog 的 property 来打开和关闭 external catalog 的自动收集。

```sql
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='true'); // 打开自动收集
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='false'); // 关闭自动收集
```

<br />

## 2. 作业管理
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -2303,4 +2304,16 @@ public long getDataSize(boolean singleReplica) {
}
return dataSize;
}

public boolean isDistributionColumn(String columnName) {
Set<String> distributeColumns = getDistributionColumnNames()
.stream().map(String::toLowerCase).collect(Collectors.toSet());
return distributeColumns.contains(columnName.toLowerCase(Locale.ROOT));
}

@Override
public boolean isPartitionColumn(String columnName) {
return getPartitionInfo().getPartitionColumns().stream()
.anyMatch(c -> c.getName().equalsIgnoreCase(columnName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,5 +252,12 @@ default long getDataSize(boolean singleReplica) {
return 0;
}

default boolean isDistributionColumn(String columnName) {
return false;
}

default boolean isPartitionColumn(String columnName) {
return false;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ public long getDataSize(boolean singleReplica) {
}
return total;
}

@Override
public boolean isDistributionColumn(String columnName) {
return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
.collect(Collectors.toSet()).contains(columnName.toLowerCase(Locale.ROOT));
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ public synchronized void appendBuf(BaseAnalysisTask task, List<ColStatsData> sta
queryingTask.remove(task);
buf.addAll(statsData);
queryFinished.add(task);
markOneTaskDone();
}

public synchronized void rowCountDone(BaseAnalysisTask task) {
queryingTask.remove(task);
queryFinished.add(task);
markOneTaskDone();
}

protected void markOneTaskDone() {
queryFinishedTaskCount += 1;
if (queryFinishedTaskCount == totalTaskCount) {
writeBuf();
Expand Down Expand Up @@ -183,6 +193,9 @@ public void deregisterJob() {
protected void syncLoadStats() {
long tblId = jobInfo.tblId;
for (BaseAnalysisTask task : queryFinished) {
if (task.info.externalTableLevelTask) {
continue;
}
String colName = task.col.getName();
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
analysisManager.removeColStatsStatus(tblId, colName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,11 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
constructJob(jobInfo, analysisTaskInfos.values());
if (!jobInfo.partitionOnly && stmt.isAllColumns()
&& StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
}
constructJob(jobInfo, analysisTaskInfos.values());
if (isSync) {
syncExecute(analysisTaskInfos.values());
updateTableStats(jobInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,53 +36,88 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public abstract class BaseAnalysisTask {

public static final Logger LOG = LogManager.getLogger(BaseAnalysisTask.class);

protected static final String NDV_MULTIPLY_THRESHOLD = "0.3";

protected static final String NDV_SAMPLE_TEMPLATE = "case when NDV(`${colName}`)/count('${colName}') < "
+ NDV_MULTIPLY_THRESHOLD
+ " then NDV(`${colName}`) "
+ "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, "
;
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;

protected static final String COLLECT_COL_STATISTICS =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ " ${catalogId} AS catalog_id, "
+ " ${dbId} AS db_id, "
+ " ${tblId} AS tbl_id, "
+ " ${idxId} AS idx_id, "
+ " '${colId}' AS col_id, "
+ " NULL AS part_id, "
+ " COUNT(1) AS row_count, "
+ " NDV(`${colName}`) AS ndv, "
+ " COUNT(1) - COUNT(${colName}) AS null_count, "
+ " CAST(MIN(${colName}) AS STRING) AS min, "
+ " CAST(MAX(${colName}) AS STRING) AS max, "
+ " ${dataSizeFunction} AS data_size, "
+ " NOW() AS update_time "
+ " FROM `${dbName}`.`${tblName}`";

protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE =
" SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "${row_count} AS row_count, "
+ "${ndv} AS ndv, "
+ "${null_count} AS null_count, "
+ "'${min}' AS min, "
+ "'${max}' AS max, "
+ "${data_size} AS data_size, "
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ " ${catalogId} AS `catalog_id`, "
+ " ${dbId} AS `db_id`, "
+ " ${tblId} AS `tbl_id`, "
+ " ${idxId} AS `idx_id`, "
+ " '${colId}' AS `col_id`, "
+ " NULL AS `part_id`, "
+ " COUNT(1) AS `row_count`, "
+ " NDV(`${colName}`) AS `ndv`, "
+ " COUNT(1) - COUNT(${colName}) AS `null_count`, "
+ " CAST(MIN(${colName}) AS STRING) AS `min`, "
+ " CAST(MAX(${colName}) AS STRING) AS `max`, "
+ " ${dataSizeFunction} AS `data_size`, "
+ " NOW() AS `update_time` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}`";

protected static final String LINEAR_ANALYZE_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "ROUND(COUNT(1) * ${scaleFactor}) AS `row_count`, "
+ "ROUND(NDV(`${colName}`) * ${scaleFactor}) as `ndv`, "
+ "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS `null_count`, "
+ "${min} AS `min`, "
+ "${max} AS `max`, "
+ "${dataSizeFunction} * ${scaleFactor} AS `data_size`, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints} ${limit}";

protected static final String DUJ1_ANALYZE_TEMPLATE = "SELECT "
+ "CONCAT('${tblId}', '-', '${idxId}', '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${rowCount} AS `row_count`, "
+ "${ndvFunction} as `ndv`, "
+ "IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * ${scaleFactor} as `null_count`, "
+ "'${min}' AS `min`, "
+ "'${max}' AS `max`, "
+ "${dataSizeFunction} * ${scaleFactor} AS `data_size`, "
+ "NOW() "
+ "FROM ( "
+ " SELECT t0.`${colName}` as column_key, COUNT(1) as `count` "
+ " FROM "
+ " (SELECT `${colName}` FROM `${catalogName}`.`${dbName}`.`${tblName}` "
+ " ${sampleHints} ${limit}) as `t0` "
+ " GROUP BY `t0`.`${colName}` "
+ ") as `t1` ";

protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${row_count} AS `row_count`, "
+ "${ndv} AS `ndv`, "
+ "${null_count} AS `null_count`, "
+ "'${min}' AS `min`, "
+ "'${max}' AS `max`, "
+ "${data_size} AS `data_size`, "
+ "NOW() ";

protected AnalysisInfo info;
Expand Down Expand Up @@ -199,29 +234,51 @@ public long getJobId() {
return info.jobId;
}

// TODO : time cost is intolerable when column is string type, return 0 directly for now.
protected String getDataSizeFunction(Column column) {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
protected String getDataSizeFunction(Column column, boolean useDuj1) {
if (useDuj1) {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`column_key`) * count)";
} else {
return "SUM(t1.count) * " + column.getType().getSlotSize();
}
} else {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
} else {
return "COUNT(1) * " + column.getType().getSlotSize();
}
}
return "COUNT(1) * " + column.getType().getSlotSize();
}

// Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
protected String getMinFunction() {
if (tableSample == null) {
return "CAST(MIN(`${colName}`) as ${type}) ";
return "to_base64(CAST(MIN(`${colName}`) as ${type})) ";
} else {
return "NULL ";
// Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
return "NULL";
}
}

protected String getNdvFunction(String totalRows) {
String sampleRows = "SUM(t1.count)";
String onceCount = "SUM(IF(t1.count = 1, 1, 0))";
String countDistinct = "COUNT(1)";
// DUJ1 estimator: n*d / (n - f1 + f1*n/N)
// f1 is the count of element that appears only once in the sample.
// (https://github.com/postgres/postgres/blob/master/src/backend/commands/analyze.c)
// (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.93.8637&rep=rep1&type=pdf)
// sample_row * count_distinct / ( sample_row - once_count + once_count * sample_row / total_row)
String fn = MessageFormat.format("{0} * {1} / ({0} - {2} + {2} * {0} / {3})", sampleRows,
countDistinct, onceCount, totalRows);
return fn;
}

// Max value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
protected String getMaxFunction() {
if (tableSample == null) {
return "CAST(MAX(`${colName}`) as ${type}) ";
return "to_base64(CAST(MAX(`${colName}`) as ${type})) ";
} else {
return "NULL ";
return "NULL";
}
}

Expand Down Expand Up @@ -254,12 +311,11 @@ public void setJob(AnalysisJob job) {
this.job = job;
}

protected void runQuery(String sql) {
protected void runQuery(String sql, boolean needEncode) {
long startTime = System.currentTimeMillis();
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) {
stmtExecutor = new StmtExecutor(a.connectContext, sql);
stmtExecutor.executeInternalQuery();
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0), needEncode);
job.appendBuf(this, Collections.singletonList(colStatsData));
} finally {
LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000);
Expand Down
Loading

0 comments on commit 9069a3b

Please sign in to comment.