Skip to content

Commit

Permalink
fix(controller): fix job status is unknown occasionally (#1347)
Browse files Browse the repository at this point in the history
* stash

* fix job unknown status bug
  • Loading branch information
anda-ren authored Oct 12, 2022
1 parent 1a78b41 commit 50e34eb
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public ResponseEntity<ResponseMessage<String>> modifyJobComment(String projectUr

@Override
public ResponseEntity<ResponseMessage<Graph>> getJobDag(String projectUrl, String jobUrl) {
return ResponseEntity.ok(Code.success.asResponse(dagQuerier.dagOfJob(jobUrl, true)));
return ResponseEntity.ok(Code.success.asResponse(dagQuerier.dagOfJob(jobUrl)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import ai.starwhale.mlops.domain.job.JobType;
import ai.starwhale.mlops.domain.job.bo.Job;
import ai.starwhale.mlops.domain.job.cache.HotJobHolder;
import ai.starwhale.mlops.domain.job.cache.JobLoader;
import ai.starwhale.mlops.domain.job.converter.JobBoConverter;
import ai.starwhale.mlops.domain.job.mapper.JobMapper;
import ai.starwhale.mlops.domain.job.po.JobEntity;
import ai.starwhale.mlops.domain.job.status.JobStatus;
import ai.starwhale.mlops.domain.job.step.StepHelper;
import ai.starwhale.mlops.domain.job.step.bo.Step;
Expand All @@ -35,7 +34,6 @@
import ai.starwhale.mlops.domain.task.status.TaskStatus;
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.exception.SwValidationException.ValidSubject;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -50,39 +48,23 @@ public class DagQuerier {

final JobManager jobManager;

final HotJobHolder jobHolder;

final JobMapper jobMapper;

final StepHelper stepHelper;

final JobLoader jobLoader;
final JobBoConverter jobBoConverter;

public DagQuerier(JobManager jobManager,
HotJobHolder jobHolder, JobMapper jobMapper,
StepHelper stepHelper, JobLoader jobLoader) {
StepHelper stepHelper,
JobBoConverter jobBoConverter) {
this.jobManager = jobManager;
this.jobHolder = jobHolder;
this.jobMapper = jobMapper;
this.stepHelper = stepHelper;
this.jobLoader = jobLoader;
this.jobBoConverter = jobBoConverter;
}

public Graph dagOfJob(String jobUrl, Boolean withTask) {
return dagOfJob(jobManager.getJobId(jobUrl), withTask);
public Graph dagOfJob(String jobUrl) {
return buildGraph(jobBoConverter.fromEntity(jobManager.findJob(jobManager.fromUrl(jobUrl))));
}

private Graph dagOfJob(Long jobId, Boolean withTask) {

Collection<Job> jobs = jobHolder.ofIds(List.of(jobId));
if (null == jobs || jobs.isEmpty()) {
return buildGraphFromDb(jobId);
}
Job job = jobs.stream().findAny().get();
return buildGraphFromCache(job);
}

private Graph buildGraphFromCache(Job job) {
private Graph buildGraph(Job job) {
if (job.getStatus() == JobStatus.CREATED) {
throw new SwValidationException(ValidSubject.JOB).tip("Job is still creating");
}
Expand Down Expand Up @@ -121,15 +103,6 @@ private Graph buildGraphFromCache(Job job) {
return graph;
}

private Graph buildGraphFromDb(Long jobId) {
JobEntity jobEntity = jobMapper.findJobById(jobId);
if (null == jobEntity) {
throw new SwValidationException(ValidSubject.JOB).tip("Job doesn't exists ");
}
List<Job> jobs = jobLoader.loadEntities(List.of(jobEntity), false, false);
return buildGraphFromCache(jobs.get(0));
}

GraphNode taskNode(TaskNodeContent task, AtomicLong idx) {
return GraphNode.builder().id(idx.incrementAndGet()).type(Task.class.getSimpleName()).content(
task).entityId(task.getId()).group(Task.class.getSimpleName()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import ai.starwhale.mlops.domain.job.po.JobEntity;
import ai.starwhale.mlops.domain.job.split.JobSpliterator;
import ai.starwhale.mlops.domain.job.status.JobStatus;
import ai.starwhale.mlops.domain.job.status.JobUpdateHelper;
import ai.starwhale.mlops.domain.job.step.bo.Step;
import ai.starwhale.mlops.domain.project.ProjectManager;
import ai.starwhale.mlops.domain.runtime.RuntimeManager;
Expand Down Expand Up @@ -89,13 +90,14 @@ public class JobService {
private final SwdsManager swdsManager;
private final RuntimeManager runtimeManager;
private final ResourcePoolManager resourcePoolManager;
private final JobUpdateHelper jobUpdateHelper;

public JobService(JobBoConverter jobBoConverter, JobMapper jobMapper, JobSwdsVersionMapper jobSwdsVersionMapper,
TaskMapper taskMapper, JobConvertor jobConvertor, RuntimeManager runtimeManager,
JobSpliterator jobSpliterator, ResourcePoolManager resourcePoolManager, HotJobHolder hotJobHolder,
ProjectManager projectManager, JobManager jobManager, JobLoader jobLoader, SwmpManager swmpManager,
ResultQuerier resultQuerier, SwdsManager swdsManager, StoragePathCoordinator storagePathCoordinator,
UserService userService) {
UserService userService, JobUpdateHelper jobUpdateHelper) {
this.jobBoConverter = jobBoConverter;
this.jobMapper = jobMapper;
this.jobSwdsVersionMapper = jobSwdsVersionMapper;
Expand All @@ -113,6 +115,7 @@ public JobService(JobBoConverter jobBoConverter, JobMapper jobMapper, JobSwdsVer
this.swdsManager = swdsManager;
this.storagePathCoordinator = storagePathCoordinator;
this.userService = userService;
this.jobUpdateHelper = jobUpdateHelper;
}

public PageInfo<JobVo> listJobs(String projectUrl, Long swmpId, PageParams pageParams) {
Expand Down Expand Up @@ -212,25 +215,22 @@ public Long createJob(String projectUrl,
*/
@Scheduled(initialDelay = 10000, fixedDelay = 10000)
public void splitNewCreatedJobs() {
final Stream<Job> allNewJobs = findAllNewJobs();
allNewJobs.parallel().forEach(job -> {
//one transaction
try {
jobSpliterator.split(job);
} catch (StarwhaleException e) {
log.error("parsing step specification error", e);
jobMapper.updateJobStatus(List.of(job.getId()), JobStatus.FAIL);
return;
}
jobMapper.findJobByStatusIn(List.of(JobStatus.CREATED))
.forEach(jobEntity -> {
//one transaction
try {
jobSpliterator.split(jobEntity);
} catch (StarwhaleException e) {
log.error("parsing step specification error", e);
jobMapper.updateJobStatus(List.of(jobEntity.getId()), JobStatus.FAIL);
return;
}

jobLoader.loadEntities(List.of(jobMapper.findJobById(job.getId())), false, true);
});

}
Job job = jobBoConverter.fromEntity(jobEntity);
jobLoader.load(job, false);
jobUpdateHelper.updateJob(job);
});

Stream<Job> findAllNewJobs() {
final List<JobEntity> newJobs = jobMapper.findJobByStatusIn(List.of(JobStatus.CREATED));
return newJobs.stream().map(jobBoConverter::fromEntity);
}

/**
Expand Down Expand Up @@ -282,7 +282,7 @@ public void pauseJob(String jobUrl) {
&& task.getStatus() != TaskStatus.FAIL
&& task.getStatus() != TaskStatus.CREATED)
.collect(Collectors.toList());
if (null == notRunningTasks || notRunningTasks.isEmpty()) {
if (notRunningTasks.isEmpty()) {
return;
}
batchPersistTaskStatus(notRunningTasks, TaskStatus.PAUSED);
Expand Down Expand Up @@ -331,7 +331,9 @@ public void resumeJob(String jobUrl) {
&& jobEntity.getJobStatus() != JobStatus.CANCELED) {
throw new SwValidationException(ValidSubject.JOB).tip("only failed/paused/canceled job can be resumed ");
}
jobLoader.loadEntities(List.of(jobEntity), true, true);
Job job = jobBoConverter.fromEntity(jobEntity);
job = jobLoader.load(job, true);
jobUpdateHelper.updateJob(job);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.starwhale.mlops.domain.job.cache;

import ai.starwhale.mlops.domain.job.converter.JobBoConverter;
import ai.starwhale.mlops.domain.job.mapper.JobMapper;
import ai.starwhale.mlops.domain.job.po.JobEntity;
import ai.starwhale.mlops.domain.job.status.JobStatus;
Expand Down Expand Up @@ -43,13 +44,16 @@ public class HotJobsLoader implements CommandLineRunner {

final JobStatusMachine jobStatusMachine;

final JobBoConverter jobBoConverter;

public HotJobsLoader(
JobMapper jobMapper,
JobLoader jobLoader,
JobStatusMachine jobStatusMachine) {
JobStatusMachine jobStatusMachine, JobBoConverter jobBoConverter) {
this.jobMapper = jobMapper;
this.jobLoader = jobLoader;
this.jobStatusMachine = jobStatusMachine;
this.jobBoConverter = jobBoConverter;
}


Expand All @@ -69,7 +73,9 @@ private List<JobEntity> hotJobsFromDb() {

@Override
public void run(String... args) throws Exception {
jobLoader.loadEntities(hotJobsFromDb(), false, true);
hotJobsFromDb().forEach(jobEntity -> {
jobLoader.load(jobBoConverter.fromEntity(jobEntity), false);
});
log.info("hot jobs loaded");
}
}
Loading

0 comments on commit 50e34eb

Please sign in to comment.