Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): custom job scheduler #880

Merged
merged 119 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 109 commits
Commits
Show all changes
119 commits
Select commit Hold shift + click to select a range
c72ce28
draft new job scheduler
goldenxinxing Jul 25, 2022
adada06
tune example for mnist
goldenxinxing Jul 27, 2022
0832876
update mnist model file
Aug 2, 2022
5d3d41e
refactor(eval):job dag
Jul 27, 2022
c2f7653
fix yaml dump problem
Jul 28, 2022
7522875
run eval with existing version; single task execute
Jul 28, 2022
53fa451
dataset uri sent by param
Jul 29, 2022
58c9402
optimise entrypoint
Aug 1, 2022
d8d9c3a
code format
goldenxinxing Aug 1, 2022
9849301
feat:add default handler for evaluation
Aug 2, 2022
e7c2408
update ignore
Aug 2, 2022
d1db3fd
add default handler for eval
Aug 2, 2022
78a51e4
change job values to list
Aug 3, 2022
525ad32
job parser for controller
Aug 3, 2022
594439e
job parser for controller
Aug 3, 2022
639ce3f
controller:extract job content during upload;tune task entity(doing)
Aug 4, 2022
f867dd7
change eval handler to model
Aug 4, 2022
23a2f76
change eval handler to model
Aug 5, 2022
c42f496
default pipeline use datastore(doing)
Aug 5, 2022
236ffb9
draft new job scheduler
goldenxinxing Jul 25, 2022
40fec7c
tune example for mnist
goldenxinxing Jul 27, 2022
8c5c5d5
model change
Aug 5, 2022
0620c74
refactor(eval):job dag
Jul 27, 2022
47beade
fix yaml dump problem
Jul 28, 2022
18cc832
run eval with existing version; single task execute
Jul 28, 2022
0ed6e20
dataset uri sent by param
Jul 29, 2022
011c583
optimise entrypoint
Aug 1, 2022
d626837
code format
goldenxinxing Aug 1, 2022
22ddec6
feat:add default handler for evaluation
Aug 2, 2022
9f49100
update ignore
Aug 2, 2022
240a3c7
add default handler for eval
Aug 2, 2022
743724e
change job values to list
Aug 3, 2022
42f927a
job parser for controller
Aug 3, 2022
556e5b3
job parser for controller
Aug 3, 2022
b328f50
controller:extract job content during upload;tune task entity(doing)
Aug 4, 2022
ba8e0c9
change eval handler to model
Aug 4, 2022
bcd4fb4
change eval handler to model
Aug 5, 2022
aa9b3c7
default pipeline use datastore(doing)
Aug 5, 2022
fe2f362
Merge remote-tracking branch 'gxx-fork/feat/job_3' into feat/job_3
Aug 5, 2022
c508398
Merge branch 'main' into feat/job_3
goldenxinxing Aug 8, 2022
862725c
default pipeline support datastore
Aug 8, 2022
271f9f9
fix:datastore dir bug; add manual dump method
Aug 8, 2022
e56a687
fix:datastore scan can't found results
Aug 9, 2022
f214d8f
feat:default ppl handler support datastore
Aug 9, 2022
1d4ac2d
Merge branch 'main' into feat/job_3
goldenxinxing Aug 9, 2022
cf562b2
code format
Aug 9, 2022
8d50d78
optimise code style
Aug 9, 2022
7598aeb
add multi processing communicate:pipe
Aug 10, 2022
e2c1159
code format
Aug 10, 2022
f5dfcdc
multiprocessing use duplex pipe communication
Aug 11, 2022
ca82147
default eval step use simple way to coding
Aug 11, 2022
f4b3ce3
tune default ppl handler impl
Aug 12, 2022
e9c39ca
skip eval unittest
Aug 12, 2022
4a359b0
Merge branch 'main' into feat/job_3
goldenxinxing Aug 12, 2022
14b4a1c
fix controller var name bug
Aug 12, 2022
0e33165
controller unit test ignore
Aug 12, 2022
154e129
conflict resolve
Aug 12, 2022
e8d90b6
fix mypy check error
Aug 12, 2022
6009ffa
fix code check style problem
Aug 12, 2022
36a62d3
skip unit test
goldenxinxing Aug 12, 2022
a850a21
unit test code fix
Aug 15, 2022
1e5ebdd
code format
Aug 15, 2022
3cf3126
rollback mnist model file
Aug 15, 2022
d1027f7
datastore key support blank
Aug 15, 2022
8710e67
rollback metric key
Aug 15, 2022
ef3a4ca
change comment
Aug 15, 2022
b110d6e
Merge branch 'main' into feat/job_3
goldenxinxing Aug 15, 2022
78df99a
remove some comment
Aug 15, 2022
2f80fd9
Merge branch 'main' into feat/job_3
goldenxinxing Aug 15, 2022
085fcb8
update e2e test script
Aug 15, 2022
f4f2894
update e2e test script
Aug 15, 2022
febd85b
update comment
Aug 15, 2022
3a9dfae
update some usage advice
Aug 15, 2022
35dd002
Merge branch 'main' into feat/job_3
goldenxinxing Aug 15, 2022
33e0d62
skip test for ppl
Aug 15, 2022
b3f7496
fix code check problem
Aug 15, 2022
47fb644
fix code check problem
Aug 15, 2022
9c84f9e
fix code check problem
Aug 15, 2022
59ad081
fix code check problem
Aug 15, 2022
6f79405
Merge branch 'main' into feat/job_3
goldenxinxing Aug 16, 2022
49a7b4c
fix code check problem
Aug 16, 2022
d447d56
Merge branch 'main' into feat/job_3
goldenxinxing Aug 16, 2022
ae4149c
add logger for datastore
Aug 16, 2022
3ee1cc4
fit new datastore scan api
Aug 16, 2022
6d4f9be
support new dataset
Aug 16, 2022
1e10c73
tune schedule way
Aug 17, 2022
5a4759f
fix:datastore dump problem at multiprocessing pool
Aug 17, 2022
68a3f62
standalone debug completed
Aug 17, 2022
52c6cbc
Merge branch 'main' into feat/job_3
goldenxinxing Aug 17, 2022
34606ec
add ppl unit test
Aug 17, 2022
4ab200c
fix bug for get metrics api
Aug 17, 2022
b6ad82d
add ppl unit test
Aug 17, 2022
154e13d
use multithread
Aug 17, 2022
331bc7e
fix unit test bug
Aug 17, 2022
6016193
fix unit test bug
Aug 18, 2022
617b87f
code format
Aug 18, 2022
9587ccc
skip test
Aug 18, 2022
0048e2c
set concurrency for pool
Aug 18, 2022
d253ad5
fix share object bug
Aug 18, 2022
cd780ec
use Protocol for Callback
Aug 18, 2022
98fe7f2
Merge branch 'main' into feat/job_3
goldenxinxing Aug 18, 2022
7f9ba31
code format
Aug 18, 2022
9cf6ad6
@step use consts var
Aug 18, 2022
8e8571a
code format
Aug 18, 2022
517d94b
step dependency -> needs
Aug 18, 2022
762b376
remove unused init method
Aug 18, 2022
00df15e
extract common function to util package
Aug 18, 2022
0a66293
rename for entrypoint
Aug 18, 2022
7aecd17
Merge branch 'main' into feat/job_3
goldenxinxing Aug 18, 2022
1dba50c
simply check
Aug 18, 2022
200c6f7
use util's function for dump
Aug 18, 2022
0029a33
Merge branch 'main' into feat/job_3
goldenxinxing Aug 18, 2022
623ff84
update requirement
Aug 18, 2022
4bf606a
code format
Aug 18, 2022
f42beed
Merge branch 'main' into feat/job_3
goldenxinxing Aug 18, 2022
7ae9fab
job yaml optimise
Aug 18, 2022
ad19dca
log format
Aug 18, 2022
77fd490
optimise list
Aug 18, 2022
b990ba5
code format
Aug 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,4 @@ docker/charts/starwhale-*.tgz

# ansible
bootstrap/inventory
/client/.idea/
28 changes: 12 additions & 16 deletions client/scripts/sw-docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ PIP_CACHE_DIR=${SW_PIP_CACHE_DIR:=/root/.cache/pip}
_MANIFEST_RUNTIME=$(cat ${WORKDIR}/_manifest.yaml| grep "python:" | awk '{print $2}' | awk -F '.' '{print $1"."$2}') || true
_MODEL_RUNTIME=$(cat ${WORKDIR}/model.yaml | grep 'runtime' | awk '{print $2}') || true
VERBOSE="-vvvv"
TASK_INDEX=${SW_TASK_INDEX:=0}

_update_python_alter() {
echo "--> set python/python3 to $1 ..."
Expand Down Expand Up @@ -59,11 +60,6 @@ pre_check() {
echo "${WORKDIR} is not starwhale target dir, will exit"
exit 1
fi

if [ ! -f "${SW_TASK_INPUT_CONFIG}" ]; then
echo "${SW_TASK_INPUT_CONFIG} not found, please set env and volume file into container"
exit 1
fi
}

set_python() {
Expand All @@ -80,7 +76,7 @@ set_python() {
_update_python_alter "python3.8"
fi
}

# TODO:restore when processing evaluation(eval run use param: 'runtime URI', and it can be uri or dir path )
restore_activate_runtime() {
echo '--> restore python env ...'
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
Expand All @@ -91,14 +87,14 @@ restore_activate_runtime() {
eval "$(./activate.sw)"
}

run_ppl() {
echo "--> start to run swmp ppl, use $(which swcli) cli..."
swcli ${VERBOSE} model ppl ${WORKDIR}/src
run_all() {
echo "--> start to run swmp all, use $(which swcli) cli..."
swcli ${VERBOSE} model eval ${WORKDIR} --type=all --dataset=${SW_DATASET_URI} --version=${SW_EVALUATION_VERSION}
}

run_cmp() {
echo "--> start to run swmp cmp, use $(which swcli) cli..."
swcli ${VERBOSE} model cmp ${WORKDIR}/src
run_single() {
echo "--> start to run swmp single, use $(which swcli) cli..."
swcli ${VERBOSE} model eval ${WORKDIR} --type=single --dataset=${SW_DATASET_URI} --step=${SW_TASK_STEP} --task-index=${TASK_INDEX} --version=${SW_EVALUATION_VERSION}
}

welcome() {
Expand All @@ -124,11 +120,11 @@ case "$1" in
pre_config)
pre_config
;;
cmp)
eval_task_prepare $1 && run_cmp
run_all)
eval_task_prepare $1 && run_all
;;
ppl)
eval_task_prepare $1 && run_ppl
run_single)
eval_task_prepare $1 && run_single
;;
*)
exec "$@"
Expand Down
28 changes: 21 additions & 7 deletions client/starwhale/api/_impl/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import pyarrow as pa # type: ignore
import requests
import pyarrow.parquet as pq # type: ignore
from loguru import logger
from typing_extensions import Protocol

from starwhale.utils.fs import ensure_dir
from starwhale.consts.env import SWEnv
from starwhale.utils.config import SWCliConfigMixed

try:
Expand Down Expand Up @@ -714,7 +716,7 @@ def get_instance() -> "LocalDataStore":

def __init__(self, root_path: str) -> None:
self.root_path = root_path
self.name_pattern = re.compile(r"^[A-Za-z0-9-_/]+$")
self.name_pattern = re.compile(r"^[A-Za-z0-9-_/ ]+$")
tianweidut marked this conversation as resolved.
Show resolved Hide resolved
self.tables: Dict[str, MemoryTable] = {}

def update_table(
Expand Down Expand Up @@ -778,6 +780,7 @@ def __init__(
self.columns = columns
self.keep_none = keep_none

logger.debug(f"scan enter, table size:{len(tables)}")
infos: List[TableInfo] = []
for table_desc in tables:
table = self.tables.get(table_desc.table_name, None)
Expand Down Expand Up @@ -837,6 +840,7 @@ def __init__(
)
)
else:
logger.debug(f"scan by disk table{info.name}")
iters.append(
_scan_table(
table_path,
Expand All @@ -851,7 +855,9 @@ def __init__(
yield record

def dump(self) -> None:
for table in self.tables.values():
logger.debug(f"start dump, tables size:{len(self.tables.values())}")
for table in list(self.tables.values()):
logger.debug(f"dump {table.table_name} to {self.root_path}")
table.dump(self.root_path)


Expand Down Expand Up @@ -977,17 +983,19 @@ def scan_tables(


def get_data_store() -> DataStore:
instance = os.getenv("SW_INSTANCE")
if instance is None or instance == "local":
instance_uri = os.getenv(SWEnv.instance_uri)
if instance_uri is None:
instance_uri = SWCliConfigMixed()._current_instance_obj["uri"]
if instance_uri == "local":
return LocalDataStore.get_instance()
else:
return RemoteDataStore(instance)
return RemoteDataStore(instance_uri)


def _flatten(record: Dict[str, Any]) -> Dict[str, Any]:
def _new(key_prefix: str, src: Dict[str, Any], dest: Dict[str, Any]) -> None:
for k, v in src.items():
k = key_prefix + k
k = key_prefix + str(k)
if type(v) is dict:
_new(k + "/", v, dest)
dest[k] = v
Expand Down Expand Up @@ -1031,7 +1039,13 @@ def insert(self, record: Dict[str, Any]) -> None:
record = _flatten(record)
for k in record:
for ch in k:
if not ch.isalnum() and ch != "-" and ch != "_" and ch != "/":
if (
not ch.isalnum()
and ch != "-"
and ch != "_"
and ch != "/"
and not ch.isspace()
):
raise RuntimeError(f"invalid field {k}")
self._insert(record)

Expand Down
1 change: 1 addition & 0 deletions client/starwhale/api/_impl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
self.name = name
self.version = version
self.table_name = f"{name}/{version[:VERSION_PREFIX_CNT]}/{version}"
logger.debug(f"dataset table name:{self.table_name}")
self._ds_wrapper = DatastoreWrapperDataset(self.table_name, project)

self.start = start
Expand Down
208 changes: 208 additions & 0 deletions client/starwhale/api/_impl/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import copy
import typing as t
from pathlib import Path

import yaml
from loguru import logger

from starwhale.consts import DEFAULT_EVALUATION_JOB_NAME, DEFAULT_EVALUATION_RESOURCE
from starwhale.utils.load import load_module


def step(
job_name: str = DEFAULT_EVALUATION_JOB_NAME,
resources: t.Optional[t.List[str]] = None,
concurrency: int = 1,
task_num: int = 1,
needs: t.Optional[t.List[str]] = None,
) -> t.Any:
_resources = resources or [
DEFAULT_EVALUATION_RESOURCE,
]
_needs = needs or []

def decorator(func: t.Any) -> t.Any:
if Parser.is_parse_stage():
_step = Step(
job_name=job_name,
step_name=func.__qualname__,
resources=_resources,
concurrency=concurrency,
task_num=task_num,
needs=_needs,
)
Parser.add_job(job_name, _step)

return func

return decorator


# Runtime concept
class Context:
def __init__(
self,
workdir: Path,
src_dir: Path,
step: str = "",
total: int = 1,
index: int = 0,
dataset_uris: t.List[str] = [],
version: str = "",
project: str = "",
kw: t.Dict[str, t.Any] = {},
):
self.project = project
self.version = version
self.step = step
self.total = total
self.index = index
self.dataset_uris = dataset_uris
self.workdir = workdir
self.src_dir = src_dir
self.kw = copy.deepcopy(kw)

def get_param(self, name: str) -> t.Any:
return self.kw.get(name)

def put_param(self, name: str, value: t.Any) -> None:
if not self.kw:
self.kw = {}
self.kw.setdefault(name, value)

def __repr__(self) -> str:
return "step:{}, total:{}, index:{}".format(self.step, self.total, self.index)


class Step:
def __init__(
self,
job_name: str,
step_name: str,
resources: t.List[str],
needs: t.List[str],
concurrency: int = 1,
task_num: int = 1,
):
self.job_name = job_name
self.step_name = step_name
self.resources = resources
self.concurrency = concurrency
self.task_num = task_num
self.needs = needs
self.status = ""

def __repr__(self) -> str:
return (
f"job_name:{self.job_name}, step_name:{self.step_name}, "
f"needs:{self.needs}, status: {self.status}"
)


class ParseConfig:
def __init__(self, is_parse_stage: bool, jobs: t.Dict[str, t.List[Step]]):
self.parse_stage = is_parse_stage
self.jobs = jobs

def clear(self) -> None:
self.jobs = {}


# shared memory, not thread safe
# parse_config = {"parse_stage": False, "jobs": {}}
parse_config = ParseConfig(False, {})


class Parser:
@staticmethod
def set_parse_stage(parse_stage: bool) -> None:
parse_config.parse_stage = parse_stage

@staticmethod
def is_parse_stage() -> bool:
return parse_config.parse_stage

@staticmethod
def add_job(job_name: str, step: Step) -> None:
_jobs = parse_config.jobs
if job_name not in _jobs:
parse_config.jobs[job_name] = []

parse_config.jobs[job_name].append(step)

@staticmethod
def get_jobs() -> t.Dict[str, t.List[Step]]:
return parse_config.jobs

# load is unique,so don't need to think multi load and clean
@staticmethod
def clear_config() -> None:
global parse_config
parse_config.clear()

@staticmethod
def parse_job_from_module(module: str, path: Path) -> t.Dict[str, t.List[Step]]:
"""
parse @step from module
:param module: module name
:param path: abs path
:return: jobs
"""
Parser.set_parse_stage(True)
# parse DAG
logger.debug("parse @step for module:{}", module)
load_module(module, path)
_jobs = Parser.get_jobs().copy()
Parser.clear_config()
return _jobs

@staticmethod
def generate_job_yaml(module: str, path: Path, target_file: Path) -> None:
"""
generate job yaml
:param target_file: yaml target path
:param module: module name
:param path: abs path
:return: None
"""
_jobs = Parser.parse_job_from_module(module, path)
# generate DAG
logger.debug("generate DAG")
if Parser.check(_jobs):
# dump to target
# ensure_file(target_file, yaml.safe_dump(_jobs, default_flow_style=False))
with open(target_file, "w") as file:
yaml.dump(_jobs, file)
logger.debug("generator DAG success!")
else:
logger.error("generator DAG error! reason:{}", "check is failed.")

@staticmethod
def check(jobs: t.Dict[str, t.List[Step]]) -> bool:
# check
checks = []
logger.debug(f"jobs:{jobs}")
for job in jobs.items():
all_steps = []
needs = []
for _step in job[1]:
all_steps.append(_step.step_name)
for d in _step.needs:
if d:
needs.append(d)
logger.debug("all steps:{}, length:{}", all_steps, len(all_steps))
_check = all(item in all_steps for item in needs)
if not _check:
logger.error("job:{} check error!", job[0])
checks.append(_check)
# all is ok
if all(c is True for c in checks):
goldenxinxing marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("check success! \n{}", yaml.dump(jobs))
return True
else:
return False

@staticmethod
def parse_job_from_yaml(file_path: str) -> t.Any:
goldenxinxing marked this conversation as resolved.
Show resolved Hide resolved
with open(file_path, "r") as file:
return yaml.unsafe_load(file)
17 changes: 17 additions & 0 deletions client/starwhale/api/_impl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ class DataField(t.NamedTuple):
ext_attr: t.Dict[str, t.Any]


class ResultLoader:
def __init__(
self,
datas: t.List[t.Any],
goldenxinxing marked this conversation as resolved.
Show resolved Hide resolved
deserializer: t.Optional[t.Callable] = None,
) -> None:
self.datas = datas
self.deserializer = deserializer

def __iter__(self) -> t.Any:
for _data in self.datas:
if self.deserializer:
yield self.deserializer(_data)
continue
yield _data


class DataLoader(metaclass=ABCMeta):
def __init__(
self,
Expand Down
Loading