Skip to content

Commit

Permalink
[BugFix] Should always get db lock when analyze
Browse files Browse the repository at this point in the history
Signed-off-by: kangkaisen <kangkaisen@apache.org>
  • Loading branch information
kangkaisen committed Apr 20, 2023
1 parent dff9fec commit 753b9fb
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static ExecPlan plan(StatementBase stmt, ConnectContext session,
}

Map<String, Database> dbs = AnalyzerUtils.collectAllDatabase(session, stmt);
boolean needWholePhaseLock = true;

// 1. For all queries, we need db lock when analyze phase
try {
Expand All @@ -76,19 +77,17 @@ public static ExecPlan plan(StatementBase stmt, ConnectContext session,
}

session.setCurrentSqlDbIds(dbs.values().stream().map(Database::getId).collect(Collectors.toSet()));
} finally {
unLock(dbs);
}

boolean isOnlyOlapTableQueries = AnalyzerUtils.isOnlyHasOlapTables(stmt);
if (isOnlyOlapTableQueries && stmt instanceof QueryStatement) {
return planQuery(stmt, resultSinkType, session, true);
}
// Note: we only could get the olap table after Analyzing phase
boolean isOnlyOlapTableQueries = AnalyzerUtils.isOnlyHasOlapTables(stmt);
if (isOnlyOlapTableQueries && stmt instanceof QueryStatement) {
unLock(dbs);
needWholePhaseLock = false;
return planQuery(stmt, resultSinkType, session, dbs, true);
}

try {
lock(dbs);
if (stmt instanceof QueryStatement) {
return planQuery(stmt, resultSinkType, session, false);
return planQuery(stmt, resultSinkType, session, dbs, false);
} else if (stmt instanceof InsertStmt) {
return new InsertPlanner().plan((InsertStmt) stmt, session);
} else if (stmt instanceof UpdateStmt) {
Expand All @@ -97,22 +96,26 @@ public static ExecPlan plan(StatementBase stmt, ConnectContext session,
return new DeletePlanner().plan((DeleteStmt) stmt, session);
}
} finally {
unLock(dbs);
if (needWholePhaseLock) {
unLock(dbs);
}
}

return null;
}

private static ExecPlan planQuery(StatementBase stmt,
TResultSinkType resultSinkType,
ConnectContext session,
Map<String, Database> dbs,
boolean isOnlyOlapTable) {
QueryStatement queryStmt = (QueryStatement) stmt;
resultSinkType = queryStmt.hasOutFileClause() ? TResultSinkType.FILE : resultSinkType;
ExecPlan plan;
if (!isOnlyOlapTable) {
plan = createQueryPlan(queryStmt.getQueryRelation(), session, resultSinkType);
} else {
plan = createQueryPlanWithReTry(queryStmt, session, resultSinkType);
plan = createQueryPlanWithReTry(queryStmt, session, resultSinkType, dbs);
}
setOutfileSink(queryStmt, plan);
return plan;
Expand Down Expand Up @@ -156,10 +159,10 @@ private static ExecPlan createQueryPlan(Relation relation,
}
}


public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
ConnectContext session,
TResultSinkType resultSinkType) {
TResultSinkType resultSinkType,
Map<String, Database> dbs) {
QueryRelation query = queryStmt.getQueryRelation();
List<String> colNames = query.getColumnOutputNames();

Expand All @@ -179,11 +182,26 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
long planStartTime = System.currentTimeMillis();

Set<OlapTable> olapTables = Sets.newHashSet();
AnalyzerUtils.copyOlapTable(queryStmt, olapTables);

try {
// Need lock to avoid olap table metas ConcurrentModificationException
lock(dbs);
AnalyzerUtils.copyOlapTable(queryStmt, olapTables);
} finally {
unLock(dbs);
}

// Only need to re analyze and re transform when schema isn't valid
if (i > 0 && !isSchemaValid) {
Analyzer.analyze(queryStmt, session);

try {
// We always need db lock when analyze phase
lock(dbs);
Analyzer.analyze(queryStmt, session);
} finally {
unLock(dbs);
}

try (PlannerProfile.ScopedTimer ignored = PlannerProfile.getScopedTimer("Transformer")) {
logicalPlan = new RelationTransformer(columnRefFactory, session).transformWithSelectLimit(query);
}
Expand Down

0 comments on commit 753b9fb

Please sign in to comment.