Skip to content

Commit

Permalink
[Enhancement] skip optimizer lock for materialized view (#34569)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
  • Loading branch information
murphyatwork authored Nov 13, 2023
1 parent 46883b7 commit 9774541
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@ public enum RefreshMoment {
DEFERRED
}

public enum PlanMode {
VALID,
INVALID,
UNKNOWN
}

@Override
public Boolean getUseLightSchemaChange() {
return false;
Expand Down Expand Up @@ -358,6 +352,16 @@ public long getLastRefreshTime() {
public void setLastRefreshTime(long lastRefreshTime) {
this.lastRefreshTime = lastRefreshTime;
}

public MvRefreshScheme copy() {
MvRefreshScheme res = new MvRefreshScheme();
res.type = this.type;
res.lastRefreshTime = this.lastRefreshTime;
if (this.asyncRefreshContext != null) {
res.asyncRefreshContext = this.asyncRefreshContext.copy();
}
return res;
}
}

@SerializedName(value = "dbId")
Expand Down Expand Up @@ -756,6 +760,25 @@ public TTableDescriptor toThrift(List<ReferencedPartitionInfo> partitions) {
fullSchema.size(), 0, getName(), "");
}

@Override
public void copyOnlyForQuery(OlapTable olapTable) {
super.copyOnlyForQuery(olapTable);
MaterializedView mv = (MaterializedView) olapTable;
mv.dbId = this.dbId;
mv.active = this.active;
mv.refreshScheme = this.refreshScheme.copy();
mv.maxMVRewriteStaleness = this.maxMVRewriteStaleness;
if (this.baseTableIds != null) {
mv.baseTableIds = Sets.newHashSet(this.baseTableIds);
}
if (this.baseTableInfos != null) {
mv.baseTableInfos = Lists.newArrayList(this.baseTableInfos);
}
if (this.partitionRefTableExprs != null) {
mv.partitionRefTableExprs = Lists.newArrayList(this.partitionRefTableExprs);
}
}

@Override
public MaterializedView selectiveCopy(Collection<String> reservedPartitions, boolean resetState,
MaterializedIndex.IndexExtState extState) {
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,15 @@ public void copyOnlyForQuery(OlapTable olapTable) {
olapTable.indexNameToId = Maps.newHashMap(this.indexNameToId);
olapTable.indexIdToMeta = Maps.newHashMap(this.indexIdToMeta);
olapTable.keysType = this.keysType;
if (this.relatedMaterializedViews != null) {
olapTable.relatedMaterializedViews = Sets.newHashSet(this.relatedMaterializedViews);
}
if (this.uniqueConstraints != null) {
olapTable.uniqueConstraints = Lists.newArrayList(this.uniqueConstraints);
}
if (this.foreignKeyConstraints != null) {
olapTable.foreignKeyConstraints = Lists.newArrayList(this.foreignKeyConstraints);
}
if (this.partitionInfo != null) {
olapTable.partitionInfo = (PartitionInfo) this.partitionInfo.clone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,7 @@ private void updateMetaForOlapTable(MaterializedView.AsyncRefreshContext refresh
if (partitionTable != null && tableId != partitionTable.getId()) {
continue;
}
if (!currentVersionMap.containsKey(tableId)) {
currentVersionMap.put(tableId, Maps.newHashMap());
}
currentVersionMap.computeIfAbsent(tableId, (v) -> Maps.newConcurrentMap());
Map<String, MaterializedView.BasePartitionInfo> currentTablePartitionInfo =
currentVersionMap.get(tableId);
Map<String, MaterializedView.BasePartitionInfo> partitionInfoMap = tableEntry.getValue();
Expand Down Expand Up @@ -599,9 +597,7 @@ private void updateMetaForExternalTable(
if (partitionTableInfo != null && !partitionTableInfo.equals(baseTableInfo)) {
continue;
}
if (!currentVersionMap.containsKey(baseTableInfo)) {
currentVersionMap.put(baseTableInfo, Maps.newHashMap());
}
currentVersionMap.computeIfAbsent(baseTableInfo, (v) -> Maps.newConcurrentMap());
Map<String, MaterializedView.BasePartitionInfo> currentTablePartitionInfo =
currentVersionMap.get(baseTableInfo);
Map<String, MaterializedView.BasePartitionInfo> partitionInfoMap = tableEntry.getValue();
Expand Down
25 changes: 20 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.http.HttpConnectContext;
import com.starrocks.planner.OlapScanNode;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.ResultSink;
import com.starrocks.qe.ConnectContext;
Expand Down Expand Up @@ -180,6 +181,7 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
for (int i = 0; i < Config.max_query_retry_time; ++i) {
long planStartTime = System.currentTimeMillis();

// TODO: double check relatedMvs for OlapTable
Set<OlapTable> olapTables = Sets.newHashSet();
Map<String, Database> dbs = AnalyzerUtils.collectAllDatabase(session, queryStmt);
session.setCurrentSqlDbIds(dbs.values().stream().map(Database::getId).collect(Collectors.toSet()));
Expand Down Expand Up @@ -227,11 +229,15 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames,
resultSinkType,
!session.getSessionVariable().isSingleNodeExecPlan());
isSchemaValid = olapTables.stream().noneMatch(t ->
t.lastSchemaUpdateTime.get() > planStartTime);
isSchemaValid = isSchemaValid && olapTables.stream().allMatch(t ->
t.lastVersionUpdateEndTime.get() < buildFragmentStartTime &&
t.lastVersionUpdateEndTime.get() >= t.lastVersionUpdateStartTime.get());

// Check rewritten tables in case of there are some materialized views
List<OlapTable> hitTables = plan.getScanNodes().stream()
.filter(scan -> scan instanceof OlapScanNode)
.map(scan -> ((OlapScanNode) scan).getOlapTable())
.collect(Collectors.toList());
isSchemaValid = hitTables.stream().noneMatch(t -> hasSchemaChange(t, planStartTime));
isSchemaValid = isSchemaValid && hitTables.stream().allMatch(
t -> noVersionChange(t, buildFragmentStartTime));
if (isSchemaValid) {
return plan;
}
Expand All @@ -242,6 +248,15 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
return null;
}

private static boolean hasSchemaChange(OlapTable table, long since) {
return table.lastSchemaUpdateTime.get() > since;
}

private static boolean noVersionChange(OlapTable table, long since) {
return (table.lastVersionUpdateEndTime.get() < since &&
table.lastVersionUpdateEndTime.get() >= table.lastVersionUpdateStartTime.get());
}

// Lock all database before analyze
private static void lock(Map<String, Database> dbs) {
if (dbs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.starrocks.catalog.InternalCatalog;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MapType;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.PartitionInfo;
Expand Down Expand Up @@ -117,6 +118,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -570,31 +572,50 @@ public Void visitTable(TableRelation node, Void context) {
if (!tables.isEmpty()) {
return null;
}
// if olap table has MV, we remove it
if (!node.getTable().isOlapTable() ||
!node.getTable().getRelatedMaterializedViews().isEmpty()) {
if (!node.getTable().isNativeTableOrMaterializedView()) {
tables.put(node.getName(), node.getTable());
}
return null;
}
}

private static class OlapTableCollector extends TableCollector {
Set<OlapTable> olapTables;
private Set<OlapTable> olapTables;
private Map<Long, OlapTable> idMap;

public OlapTableCollector(Set<OlapTable> tables) {
this.olapTables = tables;
this.idMap = new HashMap<>();
}

@Override
public Void visitTable(TableRelation node, Void context) {
if (node.getTable().isOlapTable()) {
OlapTable table = (OlapTable) node.getTable();
olapTables.add(table);
// Only copy the necessary olap table meta to avoid the lock when plan query
OlapTable copied = new OlapTable();
table.copyOnlyForQuery(copied);
node.setTable(copied);
if (!idMap.containsKey(table.getId())) {
olapTables.add(table);
idMap.put(table.getId(), table);
// Only copy the necessary olap table meta to avoid the lock when plan query
OlapTable copied = new OlapTable();
table.copyOnlyForQuery(copied);
node.setTable(copied);
} else {
node.setTable(idMap.get(table.getId()));
}

} else if (node.getTable().isMaterializedView()) {
MaterializedView table = (MaterializedView) node.getTable();
if (!idMap.containsKey(table.getId())) {
olapTables.add(table);
idMap.put(table.getId(), table);
// Only copy the necessary olap table meta to avoid the lock when plan query
MaterializedView copied = new MaterializedView();
table.copyOnlyForQuery(copied);
node.setTable(copied);
} else {
node.setTable(idMap.get(table.getId()));
}

}
return null;
}
Expand Down

0 comments on commit 9774541

Please sign in to comment.