Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](plsql) Fix procedure key compatibility #31445

Merged
merged 6 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3333,7 +3333,7 @@ public Object visitCallProcedure(CallProcedureContext ctx) {
List<Expression> arguments = ctx.expression().stream()
.<Expression>map(this::typedVisit)
.collect(ImmutableList.toImmutableList());
UnboundFunction unboundFunction = new UnboundFunction(procedureName.getDb(), procedureName.getName(),
UnboundFunction unboundFunction = new UnboundFunction(procedureName.getDbName(), procedureName.getName(),
true, arguments);
return new CallCommand(unboundFunction, getOriginSql(ctx));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ public CreateProcedureCommand(FuncNameInfo procedureName, String source, boolean

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// TODO, removeCached needs to be synchronized to all Observer FEs.
// Even if it is always executed on the Master FE, it still has to deal with Master switching.
ctx.getPlSqlOperation().getExec().functions.removeCached(procedureName.toString());
client.addPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtl(), procedureName.getDb(),
ctx.getQualifiedUser(), source, isForce);
ctx.getPlSqlOperation().getExec().functions.save(procedureName, source, isForce);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.nereids.trees.plans.commands.info;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.util.Utils;
Expand All @@ -28,15 +30,19 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* procedure, function, package name info
*/
public class FuncNameInfo {
private final List<String> nameParts;
private String ctl;
private String db;
private String ctl = "";
private long ctlId = -1;
private String db = "";
private long dbId = -1;
private final String name;
private boolean isAnalyzed = false;

/**
* FuncNameInfo
Expand Down Expand Up @@ -91,22 +97,32 @@ public FuncNameInfo(String name) {
* @param ctx ctx
*/
public void analyze(ConnectContext ctx) {
if (Strings.isNullOrEmpty(ctl)) {
ctl = ctx.getDefaultCatalog();
if (isAnalyzed) {
return;
}
try {
if (Strings.isNullOrEmpty(ctl)) {
ctl = InternalCatalog.INTERNAL_CATALOG_NAME;
ctl = ctx.getDefaultCatalog();
if (Strings.isNullOrEmpty(ctl)) {
ctl = InternalCatalog.INTERNAL_CATALOG_NAME;
}
}
}
if (Strings.isNullOrEmpty(db)) {
db = ctx.getDatabase();
ctlId = ctx.getCatalog(ctl).getId();
if (Strings.isNullOrEmpty(db)) {
throw new AnalysisException("procedure/function/package name no database selected");
db = ctx.getDatabase();
if (Strings.isNullOrEmpty(db)) {
db = FeConstants.INTERNAL_DB_NAME;
}
}
Optional<DatabaseIf> dbInstance = ctx.getCatalog(ctl).getDb(db);
dbId = dbInstance.map(DatabaseIf::getId).orElse(-1L);
if (Strings.isNullOrEmpty(name)) {
throw new AnalysisException("procedure/function/package name is null");
}
} catch (Exception e) {
throw new AnalysisException("failed to analyze procedure name", e);
}

if (Strings.isNullOrEmpty(name)) {
throw new AnalysisException("procedure/function/package name is null");
}
isAnalyzed = true;
}

/**
Expand All @@ -115,29 +131,51 @@ public void analyze(ConnectContext ctx) {
* @return ctlName
*/
public String getCtl() {
analyze(ConnectContext.get());
return ctl == null ? "" : ctl;
}

/**
* get catalog id
*
* @return ctlId
*/
public long getCtlId() {
analyze(ConnectContext.get());
return ctlId;
}

/**
* get db name
*
* @return dbName
*/
public String getDb() {
public String getDbName() {
analyze(ConnectContext.get());
return db == null ? "" : db;
}

/**
* get db id
*
* @return dbId
*/
public long getDbId() {
analyze(ConnectContext.get());
return dbId;
}

/**
* get table name
*
* @return tableName
*/
public String getName() {
analyze(ConnectContext.get());
return name == null ? "" : name;
}

public String toString() {
return nameParts.stream().map(Utils::quoteIfNeeded)
.reduce((left, right) -> left + "." + right).orElse("");
return nameParts.stream().map(Utils::quoteIfNeeded).reduce((left, right) -> left + "." + right).orElse("");
}
}
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/plsql/Exec.java
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ private int functionCall(ParserRuleContext ctx, MultipartIdentifierContext ident
FuncNameInfo procedureName = new FuncNameInfo(nameParts);
Package packCallContext = exec.getPackageCallContext();
boolean executed = false;
Package pack = findPackage(procedureName.getDb());
Package pack = findPackage(procedureName.getDbName());
if (pack != null) {
executed = pack.execFunc(procedureName.getName(), params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ public boolean exists(FuncNameInfo procedureName) {
@Override
public void remove(FuncNameInfo procedureName) {
try {
client.dropPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtl(),
procedureName.getDb());
client.dropPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtlId(), procedureName.getDbId());
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException("failed to remove procedure", e);
}
}

Expand Down Expand Up @@ -124,8 +123,8 @@ private void execProcOrFunc(Expr_func_paramsContext ctx, ParserRuleContext procC
}
}

private void callWithParameters(Expr_func_paramsContext ctx, ParserRuleContext procCtx,
HashMap<String, Var> out, ArrayList<Var> actualParams) {
private void callWithParameters(Expr_func_paramsContext ctx, ParserRuleContext procCtx, HashMap<String, Var> out,
ArrayList<Var> actualParams) {
if (procCtx instanceof Create_function_stmtContext) {
Create_function_stmtContext func = (Create_function_stmtContext) procCtx;
InMemoryFunctionRegistry.setCallParameters(func.multipartIdentifier().getText(), ctx, actualParams,
Expand All @@ -152,9 +151,8 @@ private ParserRuleContext parse(PlsqlStoredProcedure proc) {
}

private Optional<PlsqlStoredProcedure> getProc(FuncNameInfo procedureName) {
return Optional.ofNullable(
client.getPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtl(),
procedureName.getDb()));
return Optional.ofNullable(client.getPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtlId(),
procedureName.getDbId()));
}

private ArrayList<Var> getActualCallParameters(Expr_func_paramsContext actual) {
Expand All @@ -179,7 +177,7 @@ public void addUserFunction(Create_function_stmtContext ctx) {
}
trace(ctx, "CREATE FUNCTION " + procedureName.toString());
saveInCache(procedureName.toString(), ctx);
saveStoredProc(procedureName, Exec.getFormattedText(ctx), ctx.REPLACE() != null);
save(procedureName, Exec.getFormattedText(ctx), ctx.REPLACE() != null);
}

@Override
Expand All @@ -192,13 +190,19 @@ public void addUserProcedure(Create_procedure_stmtContext ctx) {
}
trace(ctx, "CREATE PROCEDURE " + procedureName.toString());
saveInCache(procedureName.toString(), ctx);
saveStoredProc(procedureName, Exec.getFormattedText(ctx), ctx.REPLACE() != null);
save(procedureName, Exec.getFormattedText(ctx), ctx.REPLACE() != null);
}

private void saveStoredProc(FuncNameInfo procedureName, String source, boolean isForce) {
client.addPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtl(),
procedureName.getDb(),
ConnectContext.get().getQualifiedUser(), source, isForce);
@Override
public void save(FuncNameInfo procedureName, String source, boolean isForce) {
try {
// TODO support packageName
client.addPlsqlStoredProcedure(procedureName.getName(), procedureName.getCtlId(), procedureName.getDbId(),
"",
ConnectContext.get().getQualifiedUser(), source, isForce);
} catch (Exception e) {
throw new RuntimeException("failed to save procedure", e);
}
}

private void saveInCache(String name, ParserRuleContext procCtx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface FunctionRegistry {

void addUserProcedure(Create_procedure_stmtContext ctx);

void save(FuncNameInfo procedureName, String source, boolean isForce);

boolean exists(FuncNameInfo procedureName);

void remove(FuncNameInfo procedureName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public InMemoryFunctionRegistry(Exec e, BuiltinFunctions builtinFunctions) {
this.builtinFunctions = builtinFunctions;
}

@Override
public void save(FuncNameInfo procedureName, String source, boolean isForce) {
throw new RuntimeException("InMemoryFunctionRegistry no support save");
}

@Override
public boolean exists(FuncNameInfo procedureName) {
return funcMap.containsKey(procedureName.toString()) || procMap.containsKey(procedureName.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public PlsqlStoredProcedure getPlsqlStoredProcedure(PlsqlProcedureKey plsqlProce
}

public void addPlsqlStoredProcedure(PlsqlStoredProcedure procedure, boolean isForce) {
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(procedure.getName(), procedure.getCatalogName(),
procedure.getDbName());
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(procedure.getName(), procedure.getCatalogId(),
procedure.getDbId());
if (isForce) {
nameToStoredProcedures.put(plsqlProcedureKey, procedure);
} else if (nameToStoredProcedures.putIfAbsent(plsqlProcedureKey, procedure) != null) {
Expand All @@ -61,8 +61,8 @@ public void addPlsqlStoredProcedure(PlsqlStoredProcedure procedure, boolean isFo
}

public void replayAddPlsqlStoredProcedure(PlsqlStoredProcedure procedure) {
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(procedure.getName(), procedure.getCatalogName(),
procedure.getDbName());
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(procedure.getName(), procedure.getCatalogId(),
procedure.getDbId());
nameToStoredProcedures.put(plsqlProcedureKey, procedure);
LOG.info("Replay add stored procedure success: {}", plsqlProcedureKey);
}
Expand All @@ -83,8 +83,8 @@ public PlsqlPackage getPackage(PlsqlProcedureKey plsqlProcedureKey) {
}

public void addPackage(PlsqlPackage pkg, boolean isForce) {
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(pkg.getName(), pkg.getCatalogName(),
pkg.getDbName());
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(pkg.getName(), pkg.getCatalogId(),
pkg.getDbId());
nameToPackages.put(plsqlProcedureKey, pkg);
if (isForce) {
nameToPackages.put(plsqlProcedureKey, pkg);
Expand All @@ -96,8 +96,8 @@ public void addPackage(PlsqlPackage pkg, boolean isForce) {
}

public void replayAddPlsqlPackage(PlsqlPackage pkg) {
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(pkg.getName(), pkg.getCatalogName(),
pkg.getDbName());
PlsqlProcedureKey plsqlProcedureKey = new PlsqlProcedureKey(pkg.getName(), pkg.getCatalogId(),
pkg.getDbId());
nameToPackages.put(plsqlProcedureKey, pkg);
LOG.info("Replay add plsql package success: {}", plsqlProcedureKey);
}
Expand Down
Loading
Loading