Skip to content

Commit

Permalink
[Enhancement] skip optimizer lock for materialized view (#34569)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
(cherry picked from commit 9774541)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java
#	fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java
#	fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AnalyzerUtils.java
  • Loading branch information
murphyatwork authored and wanpengfei-git committed Nov 16, 2023
1 parent 7685cf7 commit a79268b
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,20 @@ public enum RefreshMoment {
DEFERRED
}

<<<<<<< HEAD
public enum PlanMode {
VALID,
INVALID,
UNKNOWN
=======
@Override
public Boolean getUseLightSchemaChange() {
return false;
}

@Override
public void setUseLightSchemaChange(boolean useLightSchemaChange) {
>>>>>>> 97745418cd ([Enhancement] skip optimizer lock for materialized view (#34569))
}

public static class BasePartitionInfo {
Expand Down Expand Up @@ -317,6 +327,16 @@ public long getLastRefreshTime() {
public void setLastRefreshTime(long lastRefreshTime) {
this.lastRefreshTime = lastRefreshTime;
}

public MvRefreshScheme copy() {
MvRefreshScheme res = new MvRefreshScheme();
res.type = this.type;
res.lastRefreshTime = this.lastRefreshTime;
if (this.asyncRefreshContext != null) {
res.asyncRefreshContext = this.asyncRefreshContext.copy();
}
return res;
}
}

@SerializedName(value = "dbId")
Expand Down Expand Up @@ -650,6 +670,25 @@ public TTableDescriptor toThrift(List<ReferencedPartitionInfo> partitions) {
fullSchema.size(), 0, getName(), "");
}

@Override
public void copyOnlyForQuery(OlapTable olapTable) {
super.copyOnlyForQuery(olapTable);
MaterializedView mv = (MaterializedView) olapTable;
mv.dbId = this.dbId;
mv.active = this.active;
mv.refreshScheme = this.refreshScheme.copy();
mv.maxMVRewriteStaleness = this.maxMVRewriteStaleness;
if (this.baseTableIds != null) {
mv.baseTableIds = Sets.newHashSet(this.baseTableIds);
}
if (this.baseTableInfos != null) {
mv.baseTableInfos = Lists.newArrayList(this.baseTableInfos);
}
if (this.partitionRefTableExprs != null) {
mv.partitionRefTableExprs = Lists.newArrayList(this.partitionRefTableExprs);
}
}

@Override
public MaterializedView selectiveCopy(Collection<String> reservedPartitions, boolean resetState,
MaterializedIndex.IndexExtState extState) {
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,15 @@ public void copyOnlyForQuery(OlapTable olapTable) {
olapTable.indexNameToId = Maps.newHashMap(this.indexNameToId);
olapTable.indexIdToMeta = Maps.newHashMap(this.indexIdToMeta);
olapTable.keysType = this.keysType;
if (this.relatedMaterializedViews != null) {
olapTable.relatedMaterializedViews = Sets.newHashSet(this.relatedMaterializedViews);
}
if (this.uniqueConstraints != null) {
olapTable.uniqueConstraints = Lists.newArrayList(this.uniqueConstraints);
}
if (this.foreignKeyConstraints != null) {
olapTable.foreignKeyConstraints = Lists.newArrayList(this.foreignKeyConstraints);
}
if (this.partitionInfo != null) {
olapTable.partitionInfo = (PartitionInfo) this.partitionInfo.clone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,7 @@ private void updateMetaForOlapTable(MaterializedView.AsyncRefreshContext refresh
if (partitionTable != null && tableId != partitionTable.getId()) {
continue;
}
if (!currentVersionMap.containsKey(tableId)) {
currentVersionMap.put(tableId, Maps.newHashMap());
}
currentVersionMap.computeIfAbsent(tableId, (v) -> Maps.newConcurrentMap());
Map<String, MaterializedView.BasePartitionInfo> currentTablePartitionInfo =
currentVersionMap.get(tableId);
Map<String, MaterializedView.BasePartitionInfo> partitionInfoMap = tableEntry.getValue();
Expand Down Expand Up @@ -415,9 +413,7 @@ private void updateMetaForExternalTable(MaterializedView.AsyncRefreshContext ref
if (partitionTableInfo != null && !partitionTableInfo.equals(baseTableInfo)) {
continue;
}
if (!currentVersionMap.containsKey(baseTableInfo)) {
currentVersionMap.put(baseTableInfo, Maps.newHashMap());
}
currentVersionMap.computeIfAbsent(baseTableInfo, (v) -> Maps.newConcurrentMap());
Map<String, MaterializedView.BasePartitionInfo> currentTablePartitionInfo =
currentVersionMap.get(baseTableInfo);
Map<String, MaterializedView.BasePartitionInfo> partitionInfoMap = tableEntry.getValue();
Expand Down
31 changes: 26 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
<<<<<<< HEAD
=======
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.http.HttpConnectContext;
import com.starrocks.planner.OlapScanNode;
>>>>>>> 97745418cd ([Enhancement] skip optimizer lock for materialized view (#34569))
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.ResultSink;
import com.starrocks.qe.ConnectContext;
Expand Down Expand Up @@ -174,6 +181,7 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
for (int i = 0; i < Config.max_query_retry_time; ++i) {
long planStartTime = System.currentTimeMillis();

// TODO: double check relatedMvs for OlapTable
Set<OlapTable> olapTables = Sets.newHashSet();
Map<String, Database> dbs = AnalyzerUtils.collectAllDatabase(session, queryStmt);
session.setCurrentSqlDbIds(dbs.values().stream().map(Database::getId).collect(Collectors.toSet()));
Expand Down Expand Up @@ -221,11 +229,15 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames,
resultSinkType,
!session.getSessionVariable().isSingleNodeExecPlan());
isSchemaValid = olapTables.stream().noneMatch(t ->
t.lastSchemaUpdateTime.get() > planStartTime);
isSchemaValid = isSchemaValid && olapTables.stream().allMatch(t ->
t.lastVersionUpdateEndTime.get() < buildFragmentStartTime &&
t.lastVersionUpdateEndTime.get() >= t.lastVersionUpdateStartTime.get());

// Check rewritten tables in case of there are some materialized views
List<OlapTable> hitTables = plan.getScanNodes().stream()
.filter(scan -> scan instanceof OlapScanNode)
.map(scan -> ((OlapScanNode) scan).getOlapTable())
.collect(Collectors.toList());
isSchemaValid = hitTables.stream().noneMatch(t -> hasSchemaChange(t, planStartTime));
isSchemaValid = isSchemaValid && hitTables.stream().allMatch(
t -> noVersionChange(t, buildFragmentStartTime));
if (isSchemaValid) {
return plan;
}
Expand All @@ -236,6 +248,15 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
return null;
}

private static boolean hasSchemaChange(OlapTable table, long since) {
return table.lastSchemaUpdateTime.get() > since;
}

private static boolean noVersionChange(OlapTable table, long since) {
return (table.lastVersionUpdateEndTime.get() < since &&
table.lastVersionUpdateEndTime.get() >= table.lastVersionUpdateStartTime.get());
}

// Lock all database before analyze
private static void lock(Map<String, Database> dbs) {
if (dbs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
import com.starrocks.catalog.HiveMetaStoreTable;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.ListPartitionInfo;
<<<<<<< HEAD
=======
import com.starrocks.catalog.MapType;
import com.starrocks.catalog.MaterializedView;
>>>>>>> 97745418cd ([Enhancement] skip optimizer lock for materialized view (#34569))
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PrimitiveType;
Expand Down Expand Up @@ -105,6 +110,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -520,31 +526,50 @@ public Void visitTable(TableRelation node, Void context) {
if (!tables.isEmpty()) {
return null;
}
// if olap table has MV, we remove it
if (!node.getTable().isOlapTable() ||
!node.getTable().getRelatedMaterializedViews().isEmpty()) {
if (!node.getTable().isNativeTableOrMaterializedView()) {
tables.put(node.getName(), node.getTable());
}
return null;
}
}

private static class OlapTableCollector extends TableCollector {
Set<OlapTable> olapTables;
private Set<OlapTable> olapTables;
private Map<Long, OlapTable> idMap;

public OlapTableCollector(Set<OlapTable> tables) {
this.olapTables = tables;
this.idMap = new HashMap<>();
}

@Override
public Void visitTable(TableRelation node, Void context) {
if (node.getTable().isOlapTable()) {
OlapTable table = (OlapTable) node.getTable();
olapTables.add(table);
// Only copy the necessary olap table meta to avoid the lock when plan query
OlapTable copied = new OlapTable();
table.copyOnlyForQuery(copied);
node.setTable(copied);
if (!idMap.containsKey(table.getId())) {
olapTables.add(table);
idMap.put(table.getId(), table);
// Only copy the necessary olap table meta to avoid the lock when plan query
OlapTable copied = new OlapTable();
table.copyOnlyForQuery(copied);
node.setTable(copied);
} else {
node.setTable(idMap.get(table.getId()));
}

} else if (node.getTable().isMaterializedView()) {
MaterializedView table = (MaterializedView) node.getTable();
if (!idMap.containsKey(table.getId())) {
olapTables.add(table);
idMap.put(table.getId(), table);
// Only copy the necessary olap table meta to avoid the lock when plan query
MaterializedView copied = new MaterializedView();
table.copyOnlyForQuery(copied);
node.setTable(copied);
} else {
node.setTable(idMap.get(table.getId()));
}

}
return null;
}
Expand Down

0 comments on commit a79268b

Please sign in to comment.