Skip to content

Commit

Permalink
[feat](nereids) support partition level column stats (apache#35875)
Browse files Browse the repository at this point in the history
## Proposed changes
now partition level column stats are collected.
StatsDrive job will merge the partition level col stats as single column
stats, and then use the merged col stats for stats deriviation, instead
of using the table level column stats.

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
englefly authored Jun 24, 2024
1 parent 502f431 commit e7ea82b
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.PartitionColumnStatistic;
import org.apache.doris.statistics.PartitionColumnStatisticBuilder;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.StatisticRange;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -724,7 +726,7 @@ false, getTotalColumnStatisticMap(), false,
}
}

private ColumnStatistic getColumnStatistic(TableIf table, String colName, long idxId) {
private ColumnStatistic getColumnStatistic(TableIf table, String colName, long idxId, List<String> partitionNames) {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getSessionVariable().internalSession) {
return ColumnStatistic.UNKNOWN;
Expand All @@ -751,8 +753,45 @@ private ColumnStatistic getColumnStatistic(TableIf table, String colName, long i
return ColumnStatistic.UNKNOWN;
}
} else {
if (!partitionNames.isEmpty()) {
PartitionColumnStatisticBuilder builder = new PartitionColumnStatisticBuilder();
boolean hasUnknown = false;
// check if there is any unknown stats to avoid unnecessary partition column stats merge.
for (String partitionName : partitionNames) {
PartitionColumnStatistic pcolStats = Env.getCurrentEnv().getStatisticsCache()
.getPartitionColumnStatistics(
catalogId, dbId, table.getId(), idxId, partitionName, colName);
if (pcolStats.isUnKnown) {
hasUnknown = true;
break;
}
}
if (!hasUnknown) {
boolean isFirst = true;
// try to merge partition column stats
for (String partitionName : partitionNames) {
PartitionColumnStatistic pcolStats = Env.getCurrentEnv().getStatisticsCache()
.getPartitionColumnStatistics(
catalogId, dbId, table.getId(), idxId, partitionName, colName);
if (pcolStats.isUnKnown) {
hasUnknown = true;
break;
}
if (isFirst) {
builder = new PartitionColumnStatisticBuilder(pcolStats);
isFirst = false;
} else {
builder.merge(pcolStats);
}
}
if (!hasUnknown) {
return builder.toColumnStatistics();
}
}
}
// if any partition-col-stats is unknown, fall back to table level col stats
return Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(
catalogId, dbId, table.getId(), idxId, colName);
catalogId, dbId, table.getId(), idxId, colName);
}
}

Expand Down Expand Up @@ -785,19 +824,25 @@ private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
Set<SlotReference> slotSet = slotSetBuilder.build();
Map<Expression, ColumnStatisticBuilder> columnStatisticBuilderMap = new HashMap<>();
TableIf table = catalogRelation.getTable();
boolean isOlapTable = table instanceof OlapTable;
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
TableStatsMeta tableMeta = analysisManager.findTableStatsStatus(table.getId());
long tableUpdatedRows = tableMeta == null ? 0 : tableMeta.updatedRows.get();
double rowCount = catalogRelation.getTable().getRowCountForNereids();
boolean hasUnknownKeyCol = false;
long idxId = -1;
List<String> selectedPartitionNames;
if (catalogRelation instanceof OlapScan) {
OlapScan olapScan = (OlapScan) catalogRelation;
if (olapScan.getTable().getBaseIndexId() != olapScan.getSelectedIndexId()) {
idxId = olapScan.getSelectedIndexId();
}
selectedPartitionNames = new ArrayList<>(olapScan.getSelectedPartitionIds().size());
olapScan.getSelectedPartitionIds().forEach(id -> {
selectedPartitionNames.add(olapScan.getTable().getPartition(id).getName());
});
} else {
selectedPartitionNames = new ArrayList<>();
}
double rowCount = 0.0;
for (SlotReference slotReference : slotSet) {
boolean usedAsKey = false;
if (ConnectContext.get() != null && slotReference.getColumn().isPresent()
Expand All @@ -812,19 +857,33 @@ private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
if (colName == null) {
throw new RuntimeException(String.format("Invalid slot: %s", slotReference.getExprId()));
}
// compute delta row
long deltaRowCount = 0;
if (isOlapTable) {
if (catalogRelation instanceof OlapScan) {
OlapTable olapTable = (OlapTable) table;
ColStatsMeta colMeta = tableMeta == null ? null : tableMeta.findColumnStatsMeta(
olapTable.getIndexNameById(idxId == -1 ? olapTable.getBaseIndexId() : idxId), colName);
deltaRowCount = colMeta == null ? 0 : tableUpdatedRows - colMeta.updatedRows;
if (tableMeta != null) {
ColStatsMeta colMeta = tableMeta.findColumnStatsMeta(
olapTable.getIndexNameById(idxId == -1 ? olapTable.getBaseIndexId() : idxId), colName);
if (colMeta != null) {
if (((OlapScan) catalogRelation).getSelectedPartitionIds().isEmpty()) {
deltaRowCount = tableUpdatedRows - colMeta.updatedRows;
} else {
// sum partition delta row
for (long partitionId : ((OlapScan) catalogRelation).getSelectedPartitionIds()) {
deltaRowCount += tableMeta.partitionUpdateRows.getOrDefault(partitionId, 0L)
- colMeta.partitionUpdateRows.getOrDefault(partitionId, 0L);
}
}
}
}

}
ColumnStatistic cache;
if (!FeConstants.enableInternalSchemaDb
|| shouldIgnoreThisCol) {
cache = ColumnStatistic.UNKNOWN;
} else {
cache = getColumnStatistic(table, colName, idxId);
cache = getColumnStatistic(table, colName, idxId, selectedPartitionNames);
}
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
if (cache.avgSizeByte <= 0) {
Expand Down Expand Up @@ -859,6 +918,11 @@ private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
hasUnknownKeyCol = true;
}
}
if (rowCount <= 0.0) {
// if we failed to get rowCount from column stats, then try to get it from TableIf
rowCount = catalogRelation.getTable().getRowCountForNereids();
}

if (hasUnknownKeyCol && ConnectContext.get() != null && ConnectContext.get().getStatementContext() != null) {
ConnectContext.get().getStatementContext().setHasUnknownColStats(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,25 @@ public PartitionColumnStatistic build() {
dataSize, minValue, maxValue, minExpr, maxExpr,
isUnknown, updatedTime);
}

public PartitionColumnStatisticBuilder merge(PartitionColumnStatistic other) {
count += other.count;
ndv.merge(other.ndv);
numNulls += other.numNulls;
if (minValue > other.minValue) {
minValue = other.minValue;
minExpr = other.minExpr;
}
if (maxValue < other.maxValue) {
maxValue = other.maxValue;
maxExpr = other.maxExpr;
}
isUnknown = isUnknown && other.isUnKnown;
return this;
}

public ColumnStatistic toColumnStatistics() {
return new ColumnStatistic(count, ndv.estimateCardinality(), null,
avgSizeByte, numNulls, dataSize, minValue, maxValue, minExpr, maxExpr, isUnknown, updatedTime);
}
}
45 changes: 45 additions & 0 deletions regression-test/suites/nereids_p0/stats/partition_col_stats.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("partition_col_stats") {
multi_sql """
set global enable_partition_analyze=true;
drop table if exists pt;
CREATE TABLE `pt` (
`k1` int(11) NULL COMMENT "",
`k2` int(11) NULL COMMENT "",
`k3` int(11) NULL COMMENT ""
)
PARTITION BY RANGE(`k1`)
(PARTITION p1 VALUES LESS THAN ("3"),
PARTITION p2 VALUES [("3"), ("7")),
PARTITION p3 VALUES [("7"), ("10")))
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
PROPERTIES ('replication_num' = '1');
insert into pt values (1, 2, 2), (1, 3, 3), (1, 4, 1), (1, 4, 1), (4, 4, 4), (5,5,5),(6,6,6);
analyze table pt with sync;
"""
//run this sql to make stats be cached
sql "select * from pt where k1<3;"
sleep(10)
explain{
sql "physical plan select * from pt where k1<3;"
contains("stats=4")
}

}

0 comments on commit e7ea82b

Please sign in to comment.