Skip to content

Commit

Permalink
[Bug]Fix execute jar prejob (DataLinkDC#2918)
Browse files Browse the repository at this point in the history
* fix_execute_jar_prejob

* fix_execute_jar_prejob

* fix_execute_jar_prejob

* fix_execute_jar_prejob
  • Loading branch information
zackyoungh authored Jan 5, 2024
1 parent eb76414 commit de6317c
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;

import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -38,7 +37,7 @@ public FlinkJarSqlTask(TaskDTO task) {

@Override
public List<SqlExplainResult> explain() {
return new ArrayList<>();
return jobManager.explainSql(task.getStatement()).getSqlExplainResults();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.io.File;
Expand Down Expand Up @@ -262,6 +264,11 @@ public static void executeJarJob(String type, Executor executor, String[] statem
.configure(configuration, Thread.currentThread().getContextClassLoader());
streamGraph.getCheckpointConfig().configure(configuration);
streamGraph.setJobName(executor.getExecutorConfig().getJobName());
String savePointPath = executor.getExecutorConfig().getSavePointPath();
if (Asserts.isNotNullString(savePointPath)) {
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
savePointPath, configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
executor.getStreamExecutionEnvironment().executeAsync(streamGraph);
break;
}
Expand Down
7 changes: 7 additions & 0 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.dinky.job.builder.JobUDFBuilder;
import org.dinky.parser.SqlType;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.IpUtil;
import org.dinky.utils.LogUtil;
Expand All @@ -49,6 +51,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -295,6 +298,10 @@ public ExplainResult explainSql(String statement) {
SqlExplainResult sqlExplainResult = executor.explainSqlRecord(item.getValue());
if (Asserts.isNull(sqlExplainResult)) {
sqlExplainResult = new SqlExplainResult();
} else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) {
StreamGraph streamGraph =
new ExecuteJarOperation(item.getValue()).explain(executor.getCustomTableEnvironment());
sqlExplainResult.setExplain(streamGraph.getStreamingPlanAsJSON());
} else {
executor.executeSql(item.getValue());
}
Expand Down
27 changes: 19 additions & 8 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.lang.ref.WeakReference;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -245,24 +246,34 @@ public ObjectNode getJarStreamGraphJson(String statement) {
public JobResult executeJarSql(String statement) throws Exception {
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
StreamGraph streamGraph =
JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader());
JobJarStreamGraphBuilder jobJarStreamGraphBuilder = JobJarStreamGraphBuilder.build(this);
StreamGraph streamGraph = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader());
if (Asserts.isNotNullString(config.getSavePointPath())) {
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
config.getSavePointPath(),
executor.getStreamExecutionEnvironment()
.getConfiguration()
.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
try {
if (!useGateway) {
executor.getStreamExecutionEnvironment().executeAsync(streamGraph);
} else {
GatewayResult gatewayResult = null;
GatewayResult gatewayResult;
config.addGatewayConfig(executor.getSetConfig());
if (runMode.isApplicationMode()) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(getUdfPathContextHolder());
} else {
streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph();
if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
}
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
GatewayConfig gatewayConfig = config.getGatewayConfig();
List<String> uriList = jobJarStreamGraphBuilder.getUris(statement);
String[] jarPaths = uriList.stream()
.map(URLUtils::toFile)
.map(File::getAbsolutePath)
.toArray(String[]::new);
gatewayConfig.setJarPaths(jarPaths);
gatewayResult = Gateway.build(gatewayConfig).submitJobGraph(jobGraph);
}
job.setResult(InsertResult.success(gatewayResult.getId()));
job.setJobId(gatewayResult.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import cn.hutool.core.lang.Assert;
Expand Down Expand Up @@ -79,4 +81,17 @@ public StreamGraph getJarStreamGraph(String statement, DinkyClassLoader dinkyCla
Assert.notNull(executeJarOperation, () -> new DinkyException("Not found execute jar operation."));
return executeJarOperation.explain(executor.getCustomTableEnvironment());
}

public List<String> getUris(String statement) {
String[] statements = SqlUtil.getStatements(statement, sqlSeparator);
List<String> uriList = new ArrayList<>();
for (String sql : statements) {
String sqlStatement = executor.pretreatStatement(sql);
if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) {
uriList.add(ExecuteJarParseStrategy.getInfo(statement).getUri());
break;
}
}
return uriList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,24 @@

import org.dinky.assertion.Asserts;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.YarnResult;
import org.dinky.utils.FlinkJsonUtil;
import org.dinky.utils.LogUtil;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import cn.hutool.http.HttpUtil;

/**
* YarnApplicationGateway
*
Expand Down Expand Up @@ -89,41 +78,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
clusterSpecificationBuilder.createClusterSpecification(), applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

int counts = SystemConfiguration.getInstances().getJobIdWait();
while (yarnClient.getApplicationReport(clusterClient.getClusterId()).getYarnApplicationState()
== YarnApplicationState.ACCEPTED
&& counts-- > 0) {
Thread.sleep(1000);
}
ApplicationReport applicationReport = yarnClient.getApplicationReport(clusterClient.getClusterId());
if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
throw new RuntimeException("Yarn application state is not running, please check yarn cluster status.");
}
webUrl = applicationReport.getOriginalTrackingUrl();
final List<JobDetails> jobDetailsList = new ArrayList<>();
while (jobDetailsList.isEmpty() && counts-- > 0) {
Thread.sleep(1000);

String url = yarnClient
.getApplicationReport(clusterClient.getClusterId())
.getTrackingUrl()
+ JobsOverviewHeaders.URL.substring(1);

String json = HttpUtil.get(url);
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
if (!jobDetailsList.isEmpty()) {
break;
}
}

if (!jobDetailsList.isEmpty()) {
List<String> jobIds = new ArrayList<>();
for (JobDetails jobDetails : jobDetailsList) {
jobIds.add(jobDetails.getJobId().toHexString());
}
result.setJids(jobIds);
}
webUrl = getWebUrl(clusterClient, result);

ApplicationId applicationId = clusterClient.getClusterId();
result.setId(applicationId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.gateway.AbstractGateway;
import org.dinky.gateway.config.ClusterConfig;
import org.dinky.gateway.config.FlinkConfig;
Expand All @@ -32,13 +33,19 @@
import org.dinky.gateway.model.CustomConfig;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.gateway.result.YarnResult;
import org.dinky.utils.FlinkJsonUtil;
import org.dinky.utils.ThreadUtil;

import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
Expand Down Expand Up @@ -72,10 +79,13 @@
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ReUtil;
import cn.hutool.http.HttpUtil;

public abstract class YarnGateway extends AbstractGateway {

public static final String HADOOP_CONFIG = "fs.hdfs.hadoopconf";
private static final String HTML_TAG_REGEX = "<pre>(.*)</pre>";

protected YarnConfiguration yarnConfiguration;

Expand Down Expand Up @@ -319,4 +329,54 @@ protected YarnClusterDescriptor createInitYarnClusterDescriptor() {
true);
return yarnClusterDescriptor;
}

protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResult result)
throws YarnException, IOException, InterruptedException {
String webUrl;
int counts = SystemConfiguration.getInstances().getJobIdWait();
while (yarnClient.getApplicationReport(clusterClient.getClusterId()).getYarnApplicationState()
== YarnApplicationState.ACCEPTED
&& counts-- > 0) {
Thread.sleep(1000);
}
// 睡眠2秒,防止application快速识别抛出的错误
ThreadUtil.sleep(2000);
ApplicationReport applicationReport = yarnClient.getApplicationReport(clusterClient.getClusterId());
if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
String logUrl = yarnClient
.getContainers(applicationReport.getCurrentApplicationAttemptId())
.get(0)
.getLogUrl();
String log = ReUtil.getGroup1(HTML_TAG_REGEX, HttpUtil.get(logUrl + "/jobmanager.log?start=-10000"));
logger.error("\n\nHistory log url is: {}\n\n ", logUrl);
throw new RuntimeException(
"Yarn application state is not running, please check yarn cluster status. Log content:\n" + log);
}
webUrl = applicationReport.getOriginalTrackingUrl();
final List<JobDetails> jobDetailsList = new ArrayList<>();
while (jobDetailsList.isEmpty() && counts-- > 0) {
Thread.sleep(1000);

String url = yarnClient
.getApplicationReport(clusterClient.getClusterId())
.getTrackingUrl()
+ JobsOverviewHeaders.URL.substring(1);

String json = HttpUtil.get(url);
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
if (!jobDetailsList.isEmpty()) {
break;
}
}

if (!jobDetailsList.isEmpty()) {
List<String> jobIds = new ArrayList<>();
for (JobDetails jobDetails : jobDetailsList) {
jobIds.add(jobDetails.getJobId().toHexString());
}
result.setJids(jobIds);
}
return webUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,18 @@
package org.dinky.gateway.yarn;

import org.dinky.assertion.Asserts;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.YarnResult;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import cn.hutool.core.io.FileUtil;
Expand Down Expand Up @@ -75,26 +70,10 @@ public GatewayResult submitJobGraph(JobGraph jobGraph) {
clusterSpecificationBuilder.createClusterSpecification(), jobGraph, true);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webUrl = getWebUrl(clusterClient, result);
result.setId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL());
Collection<JobStatusMessage> jobStatusMessages =
clusterClient.listJobs().get();
int counts = SystemConfiguration.getInstances().getJobIdWait();
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
break;
}
}
if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jids.add(jobStatusMessage.getJobId().toHexString());
}
result.setJids(jids);
}
result.setWebURL(webUrl);

result.success();
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Loading

0 comments on commit de6317c

Please sign in to comment.