Skip to content

Commit

Permalink
[Refactor] Refactor materialized view partition synchronize logical (S…
Browse files Browse the repository at this point in the history
…tarRocks#9022)

Separate the logic, first calculate the partitions that need to be added and deleted, and then perform the corresponding actions.
Remove tableMvPartitionNameRefMap & mvTablePartitionNameRefMap use rule-based partition mapping.
Use the new algorithm to calculate the corresponding partition
Consider the issue of flush propagation
  • Loading branch information
Astralidea authored Jul 25, 2022
1 parent 53b3ece commit 6dab73e
Show file tree
Hide file tree
Showing 21 changed files with 1,025 additions and 1,134 deletions.
132 changes: 25 additions & 107 deletions fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
package com.starrocks.catalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.DescriptorTable.ReferencedPartitionInfo;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.UserIdentity;
import com.starrocks.common.io.DeepCopy;
Expand Down Expand Up @@ -109,18 +106,10 @@ public AsyncRefreshContext() {
this.timeUnit = null;
}

public AsyncRefreshContext(Map<Long, Map<String, BasePartitionInfo>> baseTableVisibleVersionMap) {
this.baseTableVisibleVersionMap = baseTableVisibleVersionMap;
}

public Map<Long, Map<String, BasePartitionInfo>> getBaseTableVisibleVersionMap() {
return baseTableVisibleVersionMap;
}

Map<String, BasePartitionInfo> getPartitionVisibleVersionMapForTable(long tableId) {
return baseTableVisibleVersionMap.get(tableId);
}

public boolean isDefineStartTime() {
return defineStartTime;
}
Expand Down Expand Up @@ -170,12 +159,6 @@ public MvRefreshScheme() {
this.lastRefreshTime = 0;
}

public MvRefreshScheme(RefreshType type, AsyncRefreshContext asyncRefreshContext, long lastRefreshTime) {
this.type = type;
this.asyncRefreshContext = asyncRefreshContext;
this.lastRefreshTime = lastRefreshTime;
}

public RefreshType getType() {
return type;
}
Expand Down Expand Up @@ -217,12 +200,7 @@ public void setLastRefreshTime(long lastRefreshTime) {
// for show create mv, constructing refresh job(insert into select)
@SerializedName(value = "viewDefineSql")
private String viewDefineSql;
// table partition name <-> mv partition names
@SerializedName(value = "tableMvPartitionNameRefMap")
private Map<String, Set<String>> tableMvPartitionNameRefMap = Maps.newHashMap();
// mv partition name <-> table partition names
@SerializedName(value = "mvTablePartitionNameRefMap")
private Map<String, Set<String>> mvTablePartitionNameRefMap = Maps.newHashMap();

// record expression table column
@SerializedName(value = "partitionRefTableExprs")
private List<Expr> partitionRefTableExprs;
Expand Down Expand Up @@ -264,14 +242,6 @@ public void setViewDefineSql(String viewDefineSql) {
this.viewDefineSql = viewDefineSql;
}

public Set<String> getMvPartitionNamesByTable(String tablePartitionName) {
return tableMvPartitionNameRefMap.computeIfAbsent(tablePartitionName, k -> Sets.newHashSet());
}

public Set<String> getTablePartitionNamesByMv(String mvPartitionName) {
return mvTablePartitionNameRefMap.computeIfAbsent(mvPartitionName, k -> Sets.newHashSet());
}

public Set<Long> getBaseTableIds() {
return baseTableIds;
}
Expand All @@ -296,72 +266,37 @@ public void setRefreshScheme(MvRefreshScheme refreshScheme) {
this.refreshScheme = refreshScheme;
}

public void addPartitionNameRef(String basePartitionName, String mvPartitionName) {
addPartitionNameRef(basePartitionName, mvPartitionName, false);
}

public void addPartitionNameRef(String basePartitionName, String mvPartitionName, boolean isReplay) {
tableMvPartitionNameRefMap.computeIfAbsent(basePartitionName, k -> Sets.newHashSet()).add(mvPartitionName);
mvTablePartitionNameRefMap.computeIfAbsent(mvPartitionName, k -> Sets.newHashSet()).add(basePartitionName);
if (!isReplay) {
GlobalStateMgr.getCurrentState().getEditLog().logAddMvPartitionNameRef(
new MaterializedViewPartitionNameRefInfo(this.dbId, this.id, mvPartitionName, basePartitionName));
}
}

public void removePartitionNameRefByMv(String mvPartitionName) {
removePartitionNameRefByMv(mvPartitionName, false);
}

public void removePartitionNameRefByMv(String mvPartitionName, boolean isReplay) {
Set<String> basePartitionNames = mvTablePartitionNameRefMap.get(mvPartitionName);
for (String basePartitionName : basePartitionNames) {
tableMvPartitionNameRefMap.get(basePartitionName).remove(mvPartitionName);
}
mvTablePartitionNameRefMap.remove(mvPartitionName);
if (!isReplay) {
GlobalStateMgr.getCurrentState().getEditLog().logRemoveMvPartitionNameRef(
new MaterializedViewPartitionNameRefInfo(this.dbId, this.id, mvPartitionName, null));
}
}

public void removePartitionNameRefByTable(String basePartitionName) {
removePartitionNameRefByTable(basePartitionName, false);
}

public void removePartitionNameRefByTable(String basePartitionName, boolean isReplay) {
Set<String> mvPartitionNames = tableMvPartitionNameRefMap.get(basePartitionName);
for (String mvPartitionName : mvPartitionNames) {
mvTablePartitionNameRefMap.get(mvPartitionName).remove(basePartitionName);
}
tableMvPartitionNameRefMap.remove(basePartitionName);
if (!isReplay) {
GlobalStateMgr.getCurrentState().getEditLog().logRemoveMvPartitionNameRef(
new MaterializedViewPartitionNameRefInfo(this.dbId, this.id, null, basePartitionName));
}
}

public Set<String> getExistBasePartitionNames(long baseTableId) {
return this.getRefreshScheme().getAsyncRefreshContext().getBaseTableVisibleVersionMap()
.computeIfAbsent(baseTableId, k -> Maps.newHashMap()).keySet();
}

public Set<String> getNoExistBasePartitionNames(long baseTableId, Set<String> partitionNames) {
Map<String, BasePartitionInfo> basePartitionInfoMap = this.getRefreshScheme().getAsyncRefreshContext()
.getBaseTableVisibleVersionMap()
.computeIfAbsent(baseTableId, k -> Maps.newHashMap());
return basePartitionInfoMap.keySet().stream()
.filter(partitionName -> !partitionNames.contains(partitionName))
.collect(Collectors.toSet());
}

public Set<String> getSyncedPartitionNames(long baseTableId) {
return this.getRefreshScheme().getAsyncRefreshContext()
.getBaseTableVisibleVersionMap()
.computeIfAbsent(baseTableId, k -> Maps.newHashMap())
.keySet();
}

public Set<String> getNeedRefreshPartitionNames(OlapTable base) {
Map<String, BasePartitionInfo> baseTableVisibleVersionMap = getRefreshScheme()
.getAsyncRefreshContext()
.getBaseTableVisibleVersionMap()
.computeIfAbsent(base.getId(), k -> Maps.newHashMap());

Set<String> result = Sets.newHashSet();
for (Map.Entry<String, BasePartitionInfo> versionEntry : baseTableVisibleVersionMap.entrySet()) {
String basePartitionName = versionEntry.getKey();
Partition basePartition = base.getPartition(basePartitionName);
if (basePartition == null) {
result.addAll(base.getPartitionNames());
return result;
}
BasePartitionInfo basePartitionInfo = versionEntry.getValue();
if (basePartitionInfo == null
|| basePartitionInfo.getId() != basePartition.getId()
|| basePartition.getVisibleVersion() > basePartitionInfo.getVersion()) {
result.add(basePartitionName);
}
}
return result;
}

public boolean needRefreshPartition(long baseTableId, Partition baseTablePartition) {
BasePartitionInfo basePartitionInfo = this.getRefreshScheme().getAsyncRefreshContext()
.getBaseTableVisibleVersionMap()
Expand Down Expand Up @@ -503,23 +438,6 @@ public void onCreate() {
.collect(Collectors.toList()))), connectContext);
}

public OlapTable getPartitionTable(Database database) {
Preconditions.checkState(this.getPartitionRefTableExprs().size() == 1);
Expr partitionExpr = this.getPartitionRefTableExprs().get(0);
List<SlotRef> slotRefs = Lists.newArrayList();
partitionExpr.collect(SlotRef.class, slotRefs);
// if partitionExpr is FunctionCallExpr, get first SlotRef
Preconditions.checkState(slotRefs.size() == 1);
SlotRef slotRef = slotRefs.get(0);
for (Long baseTableId : baseTableIds) {
OlapTable olapTable = (OlapTable) database.getTable(baseTableId);
if (slotRef.getTblNameWithoutAnalyzed().getTbl().equals(olapTable.getName())) {
return olapTable;
}
}
return null;
}

public static MaterializedView read(DataInput in) throws IOException {
String json = Text.readString(in);
MaterializedView mv = GsonUtils.GSON.fromJson(json, MaterializedView.class);
Expand Down

This file was deleted.

12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,18 @@ public PartitionInfo getPartitionInfo() {
return partitionInfo;
}

// partition Name -> Range
public Map<String, Range<PartitionKey>> getRangePartitionMap() {
Preconditions.checkArgument(partitionInfo.getType() == PartitionType.RANGE);
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
Map<String, Range<PartitionKey>> rangePartitionMap = Maps.newHashMap();
for (Map.Entry<Long, Partition> partitionEntry : idToPartition.entrySet()) {
Long partitionId = partitionEntry.getKey();
rangePartitionMap.put(partitionEntry.getValue().getName(), rangePartitionInfo.getRange(partitionId));
}
return rangePartitionMap;
}

public Set<String> getPartitionColumnNames() {
Set<String> partitionColumnNames = Sets.newHashSet();
if (partitionInfo instanceof SinglePartitionInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ public class DateUtils {
public static final String DATEKEY_FORMAT = "yyyyMMdd";
public static final String DATE_FORMAT = "yyyy-MM-dd";
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String MINUTE_FORMAT = "yyyyMMddHHmm";
public static final String HOUR_FORMAT = "yyyyMMddHH";
public static final String MONTH_FORMAT = "yyyyMM";
public static final String QUARTER_FORMAT = "yyyy'Q'q";
public static final String YEAR_FORMAT = "yyyy";

public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern(DATE_FORMAT);
public static final DateTimeFormatter DATEKEY_FORMATTER = DateTimeFormatter.ofPattern(DATEKEY_FORMAT);
public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern(DATE_TIME_FORMAT);
public static final DateTimeFormatter MINUTE_FORMATTER = DateTimeFormatter.ofPattern(MINUTE_FORMAT);
public static final DateTimeFormatter HOUR_FORMATTER = DateTimeFormatter.ofPattern(HOUR_FORMAT);
public static final DateTimeFormatter YEAR_FORMATTER = DateTimeFormatter.ofPattern(YEAR_FORMAT);
public static final DateTimeFormatter QUARTER_FORMATTER = DateTimeFormatter.ofPattern(QUARTER_FORMAT);
public static final DateTimeFormatter MONTH_FORMATTER = DateTimeFormatter.ofPattern(MONTH_FORMAT);

public static DateTimeFormatter probeFormat(String dateTimeStr) throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void checkRangeListsMatch(List<Range<PartitionKey>> list1, List<Ra
Range<PartitionKey> range2 = list2.get(idx2);
while (true) {
if (range1.lowerEndpoint().compareTo(range2.lowerEndpoint()) != 0) {
throw new DdlException("2 range lists are not stricly matched. "
throw new DdlException("2 range lists are not strictly matched. "
+ range1.lowerEndpoint() + " vs. " + range2.lowerEndpoint());
}

Expand Down Expand Up @@ -118,7 +118,7 @@ public static void checkRangeListsMatch(List<Range<PartitionKey>> list1, List<Ra
}

if (idx1 < list1.size() || idx2 < list2.size()) {
throw new DdlException("2 range lists are not stricly matched. "
throw new DdlException("2 range lists are not strictly matched. "
+ list1 + " vs. " + list2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.FunctionSearchDesc;
import com.starrocks.catalog.MaterializedViewPartitionNameRefInfo;
import com.starrocks.catalog.MaterializedViewPartitionVersionInfo;
import com.starrocks.catalog.MetaVersion;
import com.starrocks.catalog.Resource;
Expand Down Expand Up @@ -198,11 +197,6 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_ADD_MATERIALIZED_VIEW_PARTITION_NAME_REF_INFO:
case OperationType.OP_REMOVE_MATERIALIZED_VIEW_PARTITION_NAME_REF_INFO:
data = MaterializedViewPartitionNameRefInfo.read(in);
isRead = true;
break;
case OperationType.OP_ADD_MATERIALIZED_VIEW_PARTITION_VERSION_INFO:
case OperationType.OP_REMOVE_MATERIALIZED_VIEW_PARTITION_VERSION_INFO:
data = MaterializedViewPartitionVersionInfo.read(in);
Expand Down
Loading

0 comments on commit 6dab73e

Please sign in to comment.