diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 466a8d1947..eb783ef8bc 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -48,6 +48,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.python.PythonOptions; +import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.File; import java.io.FileOutputStream; @@ -252,7 +253,9 @@ public static void executeJarJob(String type, Executor executor, String[] statem String sqlStatement = executor.pretreatStatement(statements[i]); if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(sqlStatement); - executeJarOperation.execute(executor.getCustomTableEnvironment()); + StreamGraph streamGraph = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment()); + streamGraph.setJobName(executor.getExecutorConfig().getJobName()); + executor.getStreamExecutionEnvironment().executeAsync(streamGraph); break; } else if (Operations.getOperationType(sqlStatement) == SqlType.ADD) { File[] info = AddJarSqlParseStrategy.getInfo(sqlStatement); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index 7847791166..e8bf2ddcad 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -60,7 +60,7 @@ public Optional execute(CustomTableEnvironment tEnv) { return Optional.of(TABLE_RESULT_OK); } - protected StreamGraph getStreamGraph(CustomTableEnvironment tEnv) { + public StreamGraph getStreamGraph(CustomTableEnvironment tEnv) { JarSubmitParam submitParam = JarSubmitParam.build(statement); return getStreamGraph(submitParam, tEnv); }