Skip to content

Commit

Permalink
修复perjob的savepoint不生效问题
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Jan 15, 2022
1 parent 237e05c commit a318721
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.utils.RunTimeUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down
7 changes: 7 additions & 0 deletions dlink-core/src/main/java/com/dlink/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
Expand Down Expand Up @@ -253,6 +254,9 @@ public JobResult executeSql(String statement) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else {
config.addGatewayConfig(executor.getSetConfig());
if(Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath()));
}
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
job.setResult(InsertResult.success(gatewayResult.getAppId()));
Expand Down Expand Up @@ -290,6 +294,9 @@ public JobResult executeSql(String statement) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else {
config.addGatewayConfig(executor.getSetConfig());
if(Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath()));
}
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
job.setResult(InsertResult.success(gatewayResult.getAppId()));
Expand Down

0 comments on commit a318721

Please sign in to comment.