Skip to content

Commit

Permalink
KYLIN-3936 MR/Spark task will still run after the job is stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
guangxuCheng authored and nichunen committed Apr 24, 2019
1 parent cfabb81 commit cf2de69
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ public void discardJob(String jobId) {
} else {
logger.warn("The job " + jobId + " has been discarded.");
}
return;
throw new IllegalStateException(
"The job " + job.getId() + " has already been finished and cannot be discarded.");
}
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
Expand Down Expand Up @@ -415,6 +416,22 @@ public void pauseJob(String jobId) {
return;
}

if (!(job.getStatus() == ExecutableState.READY
|| job.getStatus() == ExecutableState.RUNNING)) {
logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
+ ". It's final state and cannot be transfer to be stopped!!!");
throw new IllegalStateException(
"The job " + job.getId() + " has already been finished and cannot be stopped.");
}
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
if (!task.getStatus().isFinalState()) {
updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null);
break;
}
}
}
updateJobOutput(jobId, ExecutableState.STOPPED, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public JobInstance pause(@PathVariable String jobId) {

try {
final JobInstance jobInstance = jobService.getJobInstance(jobId);
return jobService.pauseJob(jobInstance);
jobService.pauseJob(jobInstance);
return jobService.getJobInstance(jobId);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,11 +658,15 @@ private void setRelatedIdList(CheckpointExecutable checkpointExecutable, List<St
}
}

public JobInstance pauseJob(JobInstance job) {
public void pauseJob(JobInstance job) {
aclEvaluate.checkProjectOperationPermission(job);
logger.info("Pause job [" + job.getId() + "] trigger by "
+ SecurityContextHolder.getContext().getAuthentication().getName());
if (job.getStatus().isComplete()) {
throw new IllegalStateException(
"The job " + job.getId() + " has already been finished and cannot be stopped.");
}
getExecutableManager().pauseJob(job.getId());
job.setStatus(JobStatusEnum.STOPPED);
return job;
}

public void dropJob(JobInstance job) {
Expand Down

0 comments on commit cf2de69

Please sign in to comment.