Skip to content

Commit

Permalink
[feature](nereids)Support show queued auto analyze jobs in Nereids. (#…
Browse files Browse the repository at this point in the history
…47696)

### What problem does this PR solve?

Support show queued auto analyze jobs in Nereids.
  • Loading branch information
Jibing-Li authored Feb 13, 2025
1 parent 16af223 commit fc689e3
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ PROPERTY: 'PROPERTY';
QUANTILE_STATE: 'QUANTILE_STATE';
QUANTILE_UNION: 'QUANTILE_UNION';
QUERY: 'QUERY';
QUEUED: 'QUEUED';
QUOTA: 'QUOTA';
QUALIFY: 'QUALIFY';
QUARTER: 'QUARTER';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ unsupportedDropStatement
supportedStatsStatement
: SHOW AUTO? ANALYZE (jobId=INTEGER_VALUE | tableName=multipartIdentifier)?
(WHERE (stateKey=identifier) EQ (stateValue=STRING_LITERAL))? #showAnalyze
| SHOW QUEUED ANALYZE JOBS tableName=multipartIdentifier?
(WHERE (stateKey=identifier) EQ (stateValue=STRING_LITERAL))? #showQueuedAnalyzeJobs
;

unsupportedStatsStatement
Expand All @@ -748,7 +750,6 @@ unsupportedStatsStatement
columnList=identifierList? partitionSpec? #showColumnStats
| SHOW COLUMN HISTOGRAM tableName=multipartIdentifier
columnList=identifierList #showColumnHistogramStats
| SHOW AUTO JOBS tableName=multipartIdentifier? wildWhere? #showAutoAnalyzeJobs
| SHOW ANALYZE TASK STATUS jobId=INTEGER_VALUE #showAnalyzeTask
;

Expand Down Expand Up @@ -2012,6 +2013,7 @@ nonReserved
| QUERY
| QUOTA
| QUALIFY
| QUEUED
| RANDOM
| RECENT
| RECOVER
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ terminal String
KW_QUANTILE_UNION,
KW_AGG_STATE,
KW_QUERY,
KW_QUEUED,
KW_QUOTA,
KW_RANDOM,
KW_RANGE,
Expand Down Expand Up @@ -4721,7 +4722,7 @@ show_param ::=
:}
| KW_AUTO KW_JOBS opt_table_name:tbl opt_wild_where
{:
RESULT = new ShowAutoAnalyzeJobsStmt(tbl, parser.where);
RESULT = new ShowQueuedAnalyzeJobsStmt(tbl, parser.where);
:}
| KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId
{:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@
import com.google.common.collect.ImmutableList;

/**
* ShowAutoAnalyzeJobsStmt is used to show pending auto analysis jobs.
* ShowQueuedAnalyzeJobsStmt is used to show pending auto analysis jobs.
* syntax:
* SHOW AUTO ANALYZE JOBS
* SHOW QUEUED ANALYZE JOBS
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"|"VERY_LOW"]]
* ]
*/
public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser {
public class ShowQueuedAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser {
private static final String PRIORITY = "priority";
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("catalog_name")
Expand All @@ -56,7 +56,7 @@ public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInPa
private final TableName tableName;
private final Expr whereClause;

public ShowAutoAnalyzeJobsStmt(TableName tableName, Expr whereClause) {
public ShowQueuedAnalyzeJobsStmt(TableName tableName, Expr whereClause) {
this.tableName = tableName;
this.whereClause = whereClause;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@
import org.apache.doris.nereids.DorisParser.ShowProcedureStatusContext;
import org.apache.doris.nereids.DorisParser.ShowProcessListContext;
import org.apache.doris.nereids.DorisParser.ShowQueryProfileContext;
import org.apache.doris.nereids.DorisParser.ShowQueuedAnalyzeJobsContext;
import org.apache.doris.nereids.DorisParser.ShowReplicaDistributionContext;
import org.apache.doris.nereids.DorisParser.ShowRepositoriesContext;
import org.apache.doris.nereids.DorisParser.ShowRolesContext;
Expand Down Expand Up @@ -613,6 +614,7 @@
import org.apache.doris.nereids.trees.plans.commands.ShowProcedureStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowProcessListCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowQueryProfileCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowQueuedAnalyzeJobsCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowReplicaDistributionCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowRepositoriesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowRolesCommand;
Expand Down Expand Up @@ -5627,5 +5629,13 @@ public LogicalPlan visitShowAnalyze(ShowAnalyzeContext ctx) {
String stateValue = ctx.stateValue == null ? null : stripQuotes(ctx.stateValue.getText());
return new ShowAnalyzeCommand(tableName, jobId, stateKey, stateValue, isAuto);
}

@Override
public LogicalPlan visitShowQueuedAnalyzeJobs(ShowQueuedAnalyzeJobsContext ctx) {
List<String> tableName = ctx.tableName == null ? null : visitMultipartIdentifier(ctx.tableName);
String stateKey = ctx.stateKey == null ? null : stripQuotes(ctx.stateKey.getText());
String stateValue = ctx.stateValue == null ? null : stripQuotes(ctx.stateValue.getText());
return new ShowQueuedAnalyzeJobsCommand(tableName, stateKey, stateValue);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public enum PlanType {
ALTER_SQL_BLOCK_RULE_COMMAND,
ALTER_REPOSITORY_COMMAND,
SHOW_ANALYZE_COMMAND,
SHOW_QUEUED_ANALYZE_JOBS_COMMAND,
SHOW_BACKENDS_COMMAND,
SHOW_BLOCK_RULE_COMMAND,
SHOW_BROKER_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AutoAnalysisPendingJob;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* Used to show queued auto analysis jobs.
* syntax:
* SHOW QUEUED ANALYZE JOBS
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"|"VERY_LOW"]]
* ]
*/
public class ShowQueuedAnalyzeJobsCommand extends ShowCommand {
private static final Logger LOG = LogManager.getLogger(ShowQueuedAnalyzeJobsCommand.class);

private static final String STATE_NAME = "priority";
private static final ShowResultSetMetaData META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("catalog_name", ScalarType.createStringType()))
.addColumn(new Column("db_name", ScalarType.createStringType()))
.addColumn(new Column("tbl_name", ScalarType.createStringType()))
.addColumn(new Column("col_list", ScalarType.createStringType()))
.addColumn(new Column("priority", ScalarType.createStringType()))
.build();

private final TableNameInfo tableNameInfo;
private final String stateKey;
private final String stateValue;
private String ctl;
private String db;
private String table;

/**
* Constructor.
* @param tableName catalog.db.table
* @param stateKey Filter column name, Only support "priority" for now.
* @param stateValue Filter column value. Only support PRIORITY="HIGH"|"MID"|"LOW"|"VERY_LOW"
*/
public ShowQueuedAnalyzeJobsCommand(List<String> tableName, String stateKey, String stateValue) {
super(PlanType.SHOW_QUEUED_ANALYZE_JOBS_COMMAND);
this.tableNameInfo = tableName == null ? null : new TableNameInfo(tableName);
this.stateKey = stateKey;
this.stateValue = stateValue;
ctl = null;
db = null;
table = null;
}

private void validate(ConnectContext ctx) throws AnalysisException {
checkShowQueuedAnalyzeJobsPriv(ctx);
if (tableNameInfo != null) {
tableNameInfo.analyze(ctx);
ctl = tableNameInfo.getCtl();
db = tableNameInfo.getDb();
table = tableNameInfo.getTbl();
}
if (stateKey == null && stateValue != null || stateKey != null && stateValue == null) {
throw new AnalysisException("Invalid where clause, should be PRIORITY = \"HIGH|MID|LOW|VERY_LOW\"");
}
if (stateKey != null) {
if (!stateKey.equalsIgnoreCase(STATE_NAME)
|| !stateValue.equalsIgnoreCase("HIGH")
&& !stateValue.equalsIgnoreCase("MID")
&& !stateValue.equalsIgnoreCase("LOW")
&& !stateValue.equalsIgnoreCase("VERY_LOW")) {
throw new AnalysisException("Where clause should be PRIORITY = \"HIGH|MID|LOW|VERY_LOW\"");
}
}
}

private void checkShowQueuedAnalyzeJobsPriv(ConnectContext ctx) throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.SHOW)) {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_ACCESS_DENIED_ERROR,
"SHOW QUEUED ANALYZE JOBS",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP());
}
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitShowQueuedAnalyzeJobsCommand(this, context);
}

@Override
public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
return handleShowQueuedAnalyzeJobs();
}

private ShowResultSet handleShowQueuedAnalyzeJobs() {
List<AutoAnalysisPendingJob> jobs = Env.getCurrentEnv().getAnalysisManager().showAutoPendingJobs(
new TableName(ctl, db, table), stateValue);
List<List<String>> resultRows = Lists.newArrayList();
for (AutoAnalysisPendingJob job : jobs) {
try {
List<String> row = new ArrayList<>();
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(job.catalogName);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(job.dbName);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
if (databaseIf.isPresent()) {
Optional<? extends TableIf> table = databaseIf.get().getTable(job.tableName);
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
} else {
row.add("DB may get deleted");
}
row.add(job.getColumnNames());
row.add(String.valueOf(job.priority));
resultRows.add(row);
} catch (Exception e) {
LOG.warn("Failed to get pending jobs for table {}.{}.{}, reason: {}",
job.catalogName, job.dbName, job.tableName, e.getMessage());
}
}
return new ShowResultSet(META_DATA, resultRows);
}

@Override
public RedirectStatus toRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.doris.nereids.trees.plans.commands.ShowProcedureStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowProcessListCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowQueryProfileCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowQueuedAnalyzeJobsCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowReplicaDistributionCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowRepositoriesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowRolesCommand;
Expand Down Expand Up @@ -793,4 +794,8 @@ default R visitAlterRepositoryCommand(AlterRepositoryCommand alterRepositoryComm
default R visitShowAnalyzeCommand(ShowAnalyzeCommand showAnalyzeCommand, C context) {
return visitCommand(showAnalyzeCommand, context);
}

default R visitShowQueuedAnalyzeJobsCommand(ShowQueuedAnalyzeJobsCommand showQueuedAnalyzeJobsCommand, C context) {
return visitCommand(showQueuedAnalyzeJobsCommand, context);
}
}
13 changes: 7 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.analysis.ShowAnalyzeTaskStatus;
import org.apache.doris.analysis.ShowAuthorStmt;
import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt;
import org.apache.doris.analysis.ShowBackendsStmt;
import org.apache.doris.analysis.ShowBackupStmt;
import org.apache.doris.analysis.ShowBrokerStmt;
Expand Down Expand Up @@ -83,6 +82,7 @@
import org.apache.doris.analysis.ShowProcesslistStmt;
import org.apache.doris.analysis.ShowQueryProfileStmt;
import org.apache.doris.analysis.ShowQueryStatsStmt;
import org.apache.doris.analysis.ShowQueuedAnalyzeJobsStmt;
import org.apache.doris.analysis.ShowReplicaDistributionStmt;
import org.apache.doris.analysis.ShowReplicaStatusStmt;
import org.apache.doris.analysis.ShowRepositoriesStmt;
Expand Down Expand Up @@ -482,8 +482,8 @@ public ShowResultSet execute() throws AnalysisException {
handleShowCreateCatalog();
} else if (stmt instanceof ShowAnalyzeStmt) {
handleShowAnalyze();
} else if (stmt instanceof ShowAutoAnalyzeJobsStmt) {
handleShowAutoAnalyzePendingJobs();
} else if (stmt instanceof ShowQueuedAnalyzeJobsStmt) {
handleShowQueuedAnalyzeJobs();
} else if (stmt instanceof ShowTabletsBelongStmt) {
handleShowTabletsBelong();
} else if (stmt instanceof AdminCopyTabletStmt) {
Expand Down Expand Up @@ -3111,9 +3111,10 @@ private void handleShowAnalyze() {
resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
}

private void handleShowAutoAnalyzePendingJobs() {
ShowAutoAnalyzeJobsStmt showStmt = (ShowAutoAnalyzeJobsStmt) stmt;
List<AutoAnalysisPendingJob> jobs = Env.getCurrentEnv().getAnalysisManager().showAutoPendingJobs(showStmt);
private void handleShowQueuedAnalyzeJobs() {
ShowQueuedAnalyzeJobsStmt showStmt = (ShowQueuedAnalyzeJobsStmt) stmt;
List<AutoAnalysisPendingJob> jobs = Env.getCurrentEnv().getAnalysisManager().showAutoPendingJobs(
showStmt.getTableName(), showStmt.getPriority());
List<List<String>> resultRows = Lists.newArrayList();
for (AutoAnalysisPendingJob job : jobs) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -541,9 +540,7 @@ public void updateTableStatsForAlterStats(AnalysisInfo jobInfo, TableIf tbl) {
}
}

public List<AutoAnalysisPendingJob> showAutoPendingJobs(ShowAutoAnalyzeJobsStmt stmt) {
TableName tblName = stmt.getTableName();
String priority = stmt.getPriority();
public List<AutoAnalysisPendingJob> showAutoPendingJobs(TableName tblName, String priority) {
List<AutoAnalysisPendingJob> result = Lists.newArrayList();
if (priority == null || priority.isEmpty()) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
Expand All @@ -568,7 +565,9 @@ protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName, Set<Pair<St
synchronized (jobMap) {
for (Entry<TableName, Set<Pair<String, String>>> entry : jobMap.entrySet()) {
TableName table = entry.getKey();
if (tblName == null || tblName.equals(table)) {
if (tblName == null
|| tblName.getCtl() == null && tblName.getDb() == null && tblName.getTbl() == null
|| tblName.equals(table)) {
result.add(new AutoAnalysisPendingJob(table.getCtl(),
table.getDb(), table.getTbl(), entry.getValue(), priority));
}
Expand Down
Loading

0 comments on commit fc689e3

Please sign in to comment.