Skip to content

Commit

Permalink
refactor Traits.getUpdatedPartitionNames
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
  • Loading branch information
murphyatwork committed Nov 9, 2023
1 parent d703cb5 commit ce3bf53
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 132 deletions.
134 changes: 2 additions & 132 deletions fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.starrocks.connector.ConnectorPartitionTraits;
import com.starrocks.connector.ConnectorTableInfo;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.connector.iceberg.IcebergPartitionUtils;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.persist.gson.GsonUtils;
Expand Down Expand Up @@ -76,7 +75,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.Snapshot;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -88,15 +86,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.scheduler.PartitionBasedMvRefreshProcessor.ICEBERG_ALL_PARTITION;

/**
* meta structure for materialized view
*/
Expand Down Expand Up @@ -602,58 +597,12 @@ public static Expr getPartitionExpr(MaterializedView materializedView) {
return materializedView.getFirstPartitionRefTableExpr();
}

/**
* @param baseTable : The materialized view's olap base table.
* @param isQueryRewrite : Mark the caller is from query rewrite or not, when it's true we can use staleness to
* optimize.
* @return : Return the updated partition names of materialized view's olap base table.
*/
public Set<String> getUpdatedPartitionNamesOfOlapTable(OlapTable baseTable, boolean isQueryRewrite) {
// Ignore partitions when mv 's last refreshed time period is less than `maxMVRewriteStaleness`
if (isQueryRewrite && isStalenessSatisfied()) {
return Sets.newHashSet();
}

Map<String, BasePartitionInfo> mvBaseTableVisibleVersionMap = getRefreshScheme()
.getAsyncRefreshContext()
.getBaseTableVisibleVersionMap()
.computeIfAbsent(baseTable.getId(), k -> Maps.newHashMap());
Set<String> result = Sets.newHashSet();

// If there are new added partitions, add it into refresh result.
for (String partitionName : baseTable.getVisiblePartitionNames()) {
if (!mvBaseTableVisibleVersionMap.containsKey(partitionName)) {
Partition partition = baseTable.getPartition(partitionName);
// TODO: use `mvBaseTableVisibleVersionMap` to check whether base table has been refreshed or not instead of
// checking its version, remove this later.
if (partition.getVisibleVersion() != 1) {
result.add(partitionName);
}
}
}

for (Map.Entry<String, BasePartitionInfo> versionEntry : mvBaseTableVisibleVersionMap.entrySet()) {
String basePartitionName = versionEntry.getKey();
Partition basePartition = baseTable.getPartition(basePartitionName);
if (basePartition == null) {
// Once there is a partition deleted, refresh all partitions.
return baseTable.getVisiblePartitionNames();
}
BasePartitionInfo mvRefreshedPartitionInfo = versionEntry.getValue();
if (mvRefreshedPartitionInfo == null) {
result.add(basePartitionName);
} else {
// Ignore partitions if mv's partition is the same with the basic table.
if (mvRefreshedPartitionInfo.getId() == basePartition.getId()
&& basePartition.getVisibleVersion() == mvRefreshedPartitionInfo.getVersion()) {
continue;
}

// others will add into the result.
result.add(basePartitionName);
}
}
return result;
return ConnectorPartitionTraits.build(baseTable).getUpdatedPartitionNames(this);
}

/**
Expand Down Expand Up @@ -739,90 +688,15 @@ public List<BasePartitionInfo> getBaseTableLatestPartitionInfo(Table baseTable)
.stream().map(BasePartitionInfo::fromExternalTable).collect(Collectors.toList());
}

private Set<String> getUpdatedPartitionNameOfIcebergTable(IcebergTable baseTable) {
Set<String> result = Sets.newHashSet();
for (BaseTableInfo baseTableInfo : baseTableInfos) {
if (!baseTableInfo.getTableIdentifier().equalsIgnoreCase(baseTable.getTableIdentifier())) {
continue;
}
List<String> partitionNames = PartitionUtil.getPartitionNames(baseTable);
Snapshot snapshot = baseTable.getNativeTable().currentSnapshot();
long currentVersion = snapshot != null ? snapshot.timestampMillis() : -1;

Map<String, BasePartitionInfo> baseTableInfoVisibleVersionMap = getBaseTableRefreshInfo(baseTableInfo);
BasePartitionInfo basePartitionInfo = baseTableInfoVisibleVersionMap.get(ICEBERG_ALL_PARTITION);
if (basePartitionInfo == null) {
baseTable.setRefreshSnapshotTime(currentVersion);
return new HashSet<>(partitionNames);
}
// check if there are new partitions which are not in baseTableInfoVisibleVersionMap
for (String partitionName : partitionNames) {
if (!baseTableInfoVisibleVersionMap.containsKey(partitionName)) {
result.add(partitionName);
}
}

if (!result.isEmpty()) {
baseTable.setRefreshSnapshotTime(currentVersion);
}

long basePartitionVersion = basePartitionInfo.version;
if (basePartitionVersion < currentVersion) {
baseTable.setRefreshSnapshotTime(currentVersion);
result.addAll(IcebergPartitionUtils.getChangedPartitionNames(baseTable.getNativeTable(),
basePartitionVersion));
}
}
return result;
}

private Set<String> getUpdatedPartitionNamesOfExternalTable(Table baseTable, boolean isQueryRewrite) {
if (!baseTable.isHiveTable() && !baseTable.isJDBCTable() && !baseTable.isIcebergTable()) {
// Only support hive table and jdbc table now
return null;
}
if (baseTable.isIcebergTable()) {
return getUpdatedPartitionNameOfIcebergTable((IcebergTable) baseTable);
}

Set<String> result = Sets.newHashSet();
// NOTE: For query dump replay, ignore updated partition infos only to check mv can rewrite query or not.
// Ignore partitions when mv 's last refreshed time period is less than `maxMVRewriteStaleness`
if (FeConstants.isReplayFromQueryDump || (isQueryRewrite && isStalenessSatisfied())) {
return result;
}

Map<String, com.starrocks.connector.PartitionInfo> latestPartitionInfo =
PartitionUtil.getPartitionNameWithPartitionInfo(baseTable);

for (BaseTableInfo baseTableInfo : baseTableInfos) {
if (!baseTableInfo.getTableIdentifier().equalsIgnoreCase(baseTable.getTableIdentifier())) {
continue;
}
Map<String, BasePartitionInfo> baseTableInfoVisibleVersionMap = getBaseTableRefreshInfo(baseTableInfo);

// check whether there are partitions added
for (Map.Entry<String, com.starrocks.connector.PartitionInfo> entry : latestPartitionInfo.entrySet()) {
if (!baseTableInfoVisibleVersionMap.containsKey(entry.getKey())) {
result.add(entry.getKey());
}
}

for (Map.Entry<String, BasePartitionInfo> versionEntry : baseTableInfoVisibleVersionMap.entrySet()) {
String basePartitionName = versionEntry.getKey();
if (!latestPartitionInfo.containsKey(basePartitionName)) {
// partitions deleted
return latestPartitionInfo.keySet();
}
long basePartitionVersion = latestPartitionInfo.get(basePartitionName).getModifiedTime();

BasePartitionInfo basePartitionInfo = versionEntry.getValue();
if (basePartitionInfo == null || basePartitionVersion != basePartitionInfo.getVersion()) {
result.add(basePartitionName);
}
}
}
return result;
return ConnectorPartitionTraits.build(baseTable).getUpdatedPartitionNames(this);
}

/**
Expand Down Expand Up @@ -1273,10 +1147,6 @@ public boolean containsBaseTable(TableName tableName) {
return false;
}

private boolean supportPartialPartitionQueryRewriteForExternalTable(Table table) {
return table.isHiveTable();
}

/**
* Once the materialized view's base tables have updated, we need to check correspond materialized views' partitions
* to be refreshed.
Expand Down
Loading

0 comments on commit ce3bf53

Please sign in to comment.