Skip to content

Commit

Permalink
[BugFix] fix and refactor logical of inference bucket number (#26726)
Browse files Browse the repository at this point in the history
1. Unify the bucket number inference code to `Table::inferDistribution`
2. Not change default bucket number, so `show create table` would not
display the bucket number if not specified manually
  • Loading branch information
murphyatwork authored Jul 12, 2023
1 parent e8c7dfd commit c9e9dad
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,29 @@ public void setPlanContext(MVRewriteContextCache mvRewriteContextCache) {
this.mvRewriteContextCache = mvRewriteContextCache;
}

/**
* Infer the distribution info based on tables and MV query.
* Currently is max{bucket_num of base_table}
* TODO: infer the bucket number according to MV pattern and cardinality
*/
@Override
public void inferDistribution(DistributionInfo info) {
if (info.getBucketNum() == 0) {
int inferredBucketNum = 0;
for (BaseTableInfo base : getBaseTableInfos()) {
if (base.getTable().isNativeTableOrMaterializedView()) {
OlapTable olapTable = (OlapTable) base.getTable();
DistributionInfo dist = olapTable.getDefaultDistributionInfo();
inferredBucketNum = Math.max(inferredBucketNum, dist.getBucketNum());
}
}
if (inferredBucketNum == 0) {
inferredBucketNum = CatalogUtils.calBucketNumAccordingToBackends();
}
info.setBucketNum(inferredBucketNum);
}
}

@Override
public Map<String, String> getProperties() {
Map<String, String> properties = super.getProperties();
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,17 @@ public DistributionInfo getDefaultDistributionInfo() {
return defaultDistributionInfo;
}

/*
* Infer the distribution info based on partitions and cluster status
*/
public void inferDistribution(DistributionInfo info) {
if (info.getBucketNum() == 0) {
int numBucket = CatalogUtils.calAvgBucketNumOfRecentPartitions(this,
5, Config.enable_auto_tablet_distribution);
info.setBucketNum(numBucket);
}
}

@Override
public Set<String> getDistributionColumnNames() {
Set<String> distributionColumnNames = Sets.newHashSet();
Expand Down
48 changes: 7 additions & 41 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1288,11 +1288,7 @@ private void addPartitions(Database db, String tableName, List<PartitionDesc> pa

// get distributionInfo
distributionInfo = getDistributionInfo(olapTable, addPartitionClause).copy();
if (distributionInfo.getBucketNum() == 0) {
int numBucket = CatalogUtils.calAvgBucketNumOfRecentPartitions(olapTable, 5,
Config.enable_auto_tablet_distribution);
distributionInfo.setBucketNum(numBucket);
}
olapTable.inferDistribution(distributionInfo);

// check colocation
checkColocation(db, olapTable, distributionInfo, partitionDescs);
Expand Down Expand Up @@ -1578,25 +1574,15 @@ public void replayRecoverPartition(RecoverInfo info) {

Partition createPartition(Database db, OlapTable table, long partitionId, String partitionName,
Long version, Set<Long> tabletIdSet) throws DdlException {
return createPartitionCommon(db, table, partitionId, partitionName, table.getPartitionInfo(), version,
tabletIdSet);
}

private Partition createPartitionCommon(Database db, OlapTable table, long partitionId, String partitionName,
PartitionInfo partitionInfo, Long version, Set<Long> tabletIdSet)
throws DdlException {
PartitionInfo partitionInfo = table.getPartitionInfo();
Map<Long, MaterializedIndex> indexMap = new HashMap<>();
for (long indexId : table.getIndexIdToMeta().keySet()) {
MaterializedIndex rollup = new MaterializedIndex(indexId, MaterializedIndex.IndexState.NORMAL);
indexMap.put(indexId, rollup);
}
DistributionInfo distributionInfo = table.getDefaultDistributionInfo().copy();

if (distributionInfo.getBucketNum() == 0) {
int numBucket =
CatalogUtils.calAvgBucketNumOfRecentPartitions(table, 5, Config.enable_auto_tablet_distribution);
distributionInfo.setBucketNum(numBucket);
}
DistributionInfo distributionInfo = table.getDefaultDistributionInfo().copy();
table.inferDistribution(distributionInfo);

// create shard group
long shardGroupId = 0;
Expand Down Expand Up @@ -2689,7 +2675,7 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)
// create distribution info
DistributionDesc distributionDesc = stmt.getDistributionDesc();
Preconditions.checkNotNull(distributionDesc);
DistributionInfo distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
DistributionInfo baseDistribution = distributionDesc.toDistributionInfo(baseSchema);
// create refresh scheme
MaterializedView.MvRefreshScheme mvRefreshScheme;
RefreshSchemeDesc refreshSchemeDesc = stmt.getRefreshSchemeDesc();
Expand Down Expand Up @@ -2765,7 +2751,7 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)
} else {
materializedView =
new MaterializedView(mvId, db.getId(), mvName, baseSchema, stmt.getKeysType(), partitionInfo,
distributionInfo, mvRefreshScheme);
baseDistribution, mvRefreshScheme);
}
} else {
Preconditions.checkState(RunMode.getCurrentRunMode().isAllowCreateLakeTable());
Expand All @@ -2775,7 +2761,7 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)

materializedView =
new LakeMaterializedView(mvId, db.getId(), mvName, baseSchema, stmt.getKeysType(), partitionInfo,
distributionInfo, mvRefreshScheme);
baseDistribution, mvRefreshScheme);
}

// set comment
Expand Down Expand Up @@ -2888,26 +2874,6 @@ private DataProperty analyzeMVDataProperties(Database db,
materializedView.setMaxMVRewriteStaleness(maxMVRewriteStaleness);
}

// bucket number
// infer bucket number of materialized view based on base-table,
// currently is max{bucket_num of base_table}
// TODO: infer the bucket number according to MV pattern and cardinality
DistributionInfo distributionInfo = materializedView.getDefaultDistributionInfo();
if (distributionInfo.getBucketNum() == 0) {
int inferredBucketNum = 0;
for (BaseTableInfo base : materializedView.getBaseTableInfos()) {
if (base.getTable().isNativeTableOrMaterializedView()) {
OlapTable olapTable = (OlapTable) base.getTable();
DistributionInfo dist = olapTable.getDefaultDistributionInfo();
inferredBucketNum = Math.max(inferredBucketNum, dist.getBucketNum());
}
}
if (inferredBucketNum == 0) {
inferredBucketNum = CatalogUtils.calBucketNumAccordingToBackends();
}
distributionInfo.setBucketNum(inferredBucketNum);
}

// set storage medium
boolean hasMedium = properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM);
dataProperty = PropertyAnalyzer.analyzeDataProperty(properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.Maps;
import com.starrocks.analysis.KeysDesc;
import com.starrocks.binlog.BinlogConfig;
import com.starrocks.catalog.CatalogUtils;
import com.starrocks.catalog.ColocateTableIndex;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DataProperty;
Expand Down Expand Up @@ -372,10 +371,7 @@ public Table createTable(LocalMetastore metastore, Database db, CreateTableStmt
if (!(table instanceof ExternalOlapTable) && addedToColocateGroup) {
// Colocate table should keep the same bucket number across the partitions
DistributionInfo defaultDistributionInfo = table.getDefaultDistributionInfo();
if (defaultDistributionInfo.getBucketNum() == 0) {
int bucketNum = CatalogUtils.calBucketNumAccordingToBackends();
defaultDistributionInfo.setBucketNum(bucketNum);
}
table.inferDistribution(defaultDistributionInfo);
}

// get base index storage type. default is COLUMN
Expand Down
12 changes: 6 additions & 6 deletions test/sql/test_materialized_view/R/test_create
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ from t_hash t0 join t_random t1 on t0.k1 = t1.k1;
SHOW CREATE MATERIALIZED VIEW mv1;
-- result:
mv1 CREATE MATERIALIZED VIEW `mv1` (`k1`, `v1`)
DISTRIBUTED BY RANDOM BUCKETS 64
DISTRIBUTED BY RANDOM
REFRESH ASYNC
PROPERTIES (
"replication_num" = "1",
Expand All @@ -90,7 +90,7 @@ select k1, count(v1) from t_random group by k1;
SHOW CREATE MATERIALIZED VIEW mv2;
-- result:
mv2 CREATE MATERIALIZED VIEW `mv2` (`k1`, `count(v1)`)
DISTRIBUTED BY RANDOM BUCKETS 64
DISTRIBUTED BY RANDOM
REFRESH ASYNC
PROPERTIES (
"replication_num" = "1",
Expand All @@ -111,7 +111,7 @@ from t_hash t0 join t1 on t0.k1 = t1.k1;
SHOW CREATE MATERIALIZED VIEW mv3;
-- result:
mv3 CREATE MATERIALIZED VIEW `mv3` (`k1`, `v1`)
DISTRIBUTED BY RANDOM BUCKETS 64
DISTRIBUTED BY RANDOM
REFRESH ASYNC
PROPERTIES (
"replication_num" = "1",
Expand All @@ -131,7 +131,7 @@ from t_random t0 join t1 on t0.k1 = t1.k1;
SHOW CREATE MATERIALIZED VIEW mv4;
-- result:
mv4 CREATE MATERIALIZED VIEW `mv4` (`k1`, `v1`)
DISTRIBUTED BY RANDOM BUCKETS 64
DISTRIBUTED BY RANDOM
REFRESH ASYNC
PROPERTIES (
"replication_num" = "1",
Expand All @@ -151,7 +151,7 @@ from t1 join t2 on t1.k1 = t2.k1;
SHOW CREATE MATERIALIZED VIEW mv_part1;
-- result:
mv_part1 CREATE MATERIALIZED VIEW `mv_part1` (`k1`, `v1`)
DISTRIBUTED BY RANDOM BUCKETS 2
DISTRIBUTED BY RANDOM
REFRESH ASYNC
PROPERTIES (
"replication_num" = "1",
Expand Down Expand Up @@ -206,7 +206,7 @@ SHOW CREATE MATERIALIZED VIEW mv6;
-- result:
mv6 CREATE MATERIALIZED VIEW `mv6` (`k1`, `v1`, `v2`)
PARTITION BY (`k1`)
DISTRIBUTED BY RANDOM BUCKETS 2
DISTRIBUTED BY RANDOM
REFRESH ASYNC
PROPERTIES (
"replication_num" = "1",
Expand Down
10 changes: 5 additions & 5 deletions test/sql/test_materialized_view/R/test_show_materialized_view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ create materialized view user_tags_mv1 distributed by hash(user_id) as select u
show create materialized view user_tags_mv1;
-- result:
user_tags_mv1 CREATE MATERIALIZED VIEW `user_tags_mv1` (`user_id`, `bitmap_union(to_bitmap(tag_id))`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 3
DISTRIBUTED BY HASH(`user_id`)
REFRESH MANUAL
PROPERTIES (
"replication_num" = "1",
Expand All @@ -28,7 +28,7 @@ GROUP BY `user_tags`.`user_id`;
show create table user_tags_mv1;
-- result:
user_tags_mv1 CREATE MATERIALIZED VIEW `user_tags_mv1` (`user_id`, `bitmap_union(to_bitmap(tag_id))`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 3
DISTRIBUTED BY HASH(`user_id`)
REFRESH MANUAL
PROPERTIES (
"replication_num" = "1",
Expand All @@ -48,7 +48,7 @@ alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600
show create materialized view user_tags_mv1;
-- result:
user_tags_mv1 CREATE MATERIALIZED VIEW `user_tags_mv1` (`user_id`, `bitmap_union(to_bitmap(tag_id))`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 3
DISTRIBUTED BY HASH(`user_id`)
REFRESH MANUAL
PROPERTIES (
"replication_num" = "1",
Expand All @@ -64,7 +64,7 @@ GROUP BY `user_tags`.`user_id`;
show create table user_tags_mv1;
-- result:
user_tags_mv1 CREATE MATERIALIZED VIEW `user_tags_mv1` (`user_id`, `bitmap_union(to_bitmap(tag_id))`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 3
DISTRIBUTED BY HASH(`user_id`)
REFRESH MANUAL
PROPERTIES (
"replication_num" = "1",
Expand All @@ -79,4 +79,4 @@ GROUP BY `user_tags`.`user_id`;
-- !result
drop database test_show_materialized_view;
-- result:
-- !result
-- !result

0 comments on commit c9e9dad

Please sign in to comment.