Skip to content

Commit

Permalink
feat: add new job impl for controller (#907)
Browse files Browse the repository at this point in the history
* add new job impl

* add new comment

* rename job ddl

Co-authored-by: gaoxinxing <15931259256@163.com>
  • Loading branch information
goldenxinxing and gaoxinxing authored Aug 15, 2022
1 parent 9e668e2 commit 847d498
Show file tree
Hide file tree
Showing 20 changed files with 412 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ai.starwhale.mlops.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

@Slf4j
public final class TarFileUtil {
/**
* Find the contents of a specified file in a tar file.
*/
public static byte[] getContentFromTarFile(InputStream tarFileInputStream, String targetFilePath, String targetFileName) {
ArchiveInputStream archiveInputStream = null;
try {
archiveInputStream = getArchiveInputStream(tarFileInputStream);
TarArchiveEntry entry = null;
while ((entry = (TarArchiveEntry) archiveInputStream.getNextEntry()) != null) {
if (entry.getSize() <= 0) {
continue;
}
if (!StringUtils.isEmpty(targetFilePath) && !entry.getName().startsWith(targetFilePath)) {
continue;
}
if (!StringUtils.isEmpty(targetFileName) && !entry.getName().endsWith(targetFileName)) {
continue;
}
return getContent(archiveInputStream);
}

} catch (Exception e) {
log.error("get tar file failed!", e);
} finally {
if (null != archiveInputStream) {
try {
archiveInputStream.close();
} catch (IOException e) {
log.error("file close error!", e);
}
}
}

return null;
}

private static ArchiveInputStream getArchiveInputStream(InputStream in) throws ArchiveException {
return new ArchiveStreamFactory()
.createArchiveInputStream("tar", new BufferedInputStream(in));
}

public static byte[] getContent(InputStream is) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
byte[] buffer = new byte[1024];
while (true) {
int len = is.read(buffer);

if (len == -1) {
break;
}

baos.write(buffer, 0, len);
}
} catch (Exception e) {
e.printStackTrace();
}

return baos.toByteArray();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,13 @@ public void splitNewCreatedJobs(){
allNewJobs.parallel().forEach(job->{
//one transaction
jobSpliterator.split(job);
hotJobHolder.adopt(job);
jobLoader.loadEntities(List.of(jobMapper.findJobById(job.getId())),false,true);
/*hotJobHolder.adopt(job);
List<Task> readyToScheduleTasks = job.getSteps().parallelStream().map(Step::getTasks)
.flatMap(Collection::stream)
.filter(t -> t.getStatus() == TaskStatus.READY)
.collect(Collectors.toList());
swTaskScheduler.adopt(readyToScheduleTasks,job.getJobRuntime().getDeviceClass());
swTaskScheduler.adopt(readyToScheduleTasks,job.getJobRuntime().getDeviceClass());*/
});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class Job extends TimeConcern {

String uuid;

String evalJobDDL;

Step currentStep;

/**
Expand All @@ -68,7 +70,9 @@ public class Job extends TimeConcern {

/**
* job result holding dir
* Deprecated reason: use storage unaware of resultDir
*/
@Deprecated
String resultDir;

List<Step> steps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public Job fromEntity(JobEntity jobEntity){
.name(modelPackageEntity.getSwmpName())
.version(jobEntity.getSwmpVersion().getVersionName())
.path(jobEntity.getSwmpVersion().getStoragePath()).build())
.evalJobDDL(jobEntity.getSwmpVersion().getEvalJobs())
.swDataSets(swDataSets)
.resultDir(jobEntity.getResultOutputPath())
.uuid(jobEntity.getJobUuid())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.domain.job.parser;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;

@Slf4j
public class JobParser {
private static final String DEFAULT_JOB_NAME = "default";
private static final YAMLMapper yamlMapper = new YAMLMapper();

static {
yamlMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}

public static List<StepMetaData> parseStepFromYaml(String yamlContent) {
return parseStepFromYaml(yamlContent, DEFAULT_JOB_NAME);
}

public static List<StepMetaData> parseStepFromYaml(String yamlContent, String job) {
Map<String, List<StepMetaData>> map;
try {
map = JobParser.yamlMapper.readValue(yamlContent, new TypeReference<>() {
});
} catch (JsonProcessingException e) {
log.error("parse job yaml error.", e);
return List.of();
}
return map.getOrDefault(job, List.of());
}

public static void main(String[] args) throws JsonProcessingException {

String content = "default:\n" +
"- !!python/object:starwhale.core.job.model.Step\n" +
" concurrency: 1\n" +
" dependency:\n" +
" - ''\n" +
" job_name: default\n" +
" resources:\n" +
" - cpu=1\n" +
" status: ''\n" +
" step_name: DefaultPipeline.ppl\n" +
" task_num: 1\n" +
" tasks: []\n" +
"- !!python/object:starwhale.core.job.model.Step\n" +
" concurrency: 1\n" +
" dependency:\n" +
" - DefaultPipeline.ppl\n" +
" job_name: default\n" +
" resources:\n" +
" - cpu=1\n" +
" status: ''\n" +
" step_name: DefaultPipeline.cmp\n" +
" task_num: 1\n" +
" tasks: []";
Map<String, List<StepMetaData>> map = JobParser.yamlMapper.readValue(content, new TypeReference<Map<String, List<StepMetaData>>>() {
});

System.out.println(map.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.domain.job.parser;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.Converter;
import lombok.Builder;
import lombok.Data;

import java.util.List;

@Data
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class StepMetaData {
/**
* the name of job which would be executed at job yaml during running time
*/
@JsonProperty("job_name")
private String jobName;
@JsonProperty("step_name")
private String stepName;
private Integer concurrency = 1;
private List<String> dependency;
@JsonDeserialize(contentConverter = ResourceConverter.class)
private List<Resource> resources;
@JsonProperty("task_num")
private Integer taskNum = 1;
}

@Data
@Builder
class Resource {
// TODO:replaced by ai.starwhale.mlops.domain.node.Device.Clazz
private String type;
private Integer num;
}

class ResourceConverter implements Converter<String, Resource> {

@Override
public Resource convert(String value) {
String[] res = value.split("=");
return Resource.builder().type(res[0]).num(Integer.valueOf(res[1])).build();
}

@Override
public JavaType getInputType(TypeFactory typeFactory) {
return typeFactory.constructType(String.class);
}

@Override
public JavaType getOutputType(TypeFactory typeFactory) {
return typeFactory.constructType(Resource.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import ai.starwhale.mlops.domain.job.bo.Job;
import ai.starwhale.mlops.domain.job.step.bo.Step;
import ai.starwhale.mlops.domain.job.step.po.StepEntity;

import java.util.List;

/**
* split job to Steps. One job shall not to be split multiple times
*/
public interface JobSpliterator {

List<Step> split(Job job);
List<StepEntity> split(Job job);
}
Loading

0 comments on commit 847d498

Please sign in to comment.