Skip to content

Commit

Permalink
refactor(controller): add object storage for dataset (#1993)
Browse files Browse the repository at this point in the history
  • Loading branch information
anda-ren authored Mar 29, 2023
1 parent 50e71d8 commit 9b50838
Show file tree
Hide file tree
Showing 29 changed files with 473 additions and 485 deletions.
2 changes: 1 addition & 1 deletion client/starwhale/api/_impl/dataset/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _sign_uris(self, uris: t.List[str]) -> dict:
else DEFAULT_CONSUMPTION_BATCH_SIZE
)
r = CloudRequestMixed.do_http_request(
f"/project/{self.dataset_uri.project}/{self.dataset_uri.object.typ}/{self.dataset_uri.object.name}/version/{self.dataset_uri.object.version}/sign-links",
f"/project/{self.dataset_uri.project}/{self.dataset_uri.object.typ}/{self.dataset_uri.object.name}/uri/sign-links",
method=HTTPMethod.POST,
instance_uri=self.dataset_uri,
params={
Expand Down
2 changes: 1 addition & 1 deletion client/starwhale/core/dataset/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def _make_file(

def sign_uri(self, uri: str) -> str:
r = self.do_http_request(
f"/project/{self.dataset_uri.project}/{self.dataset_uri.object.typ}/{self.dataset_uri.object.name}/version/{self.dataset_uri.object.version}/sign-links",
f"/project/{self.dataset_uri.project}/{self.dataset_uri.object.typ}/{self.dataset_uri.object.name}/uri/sign-links",
method=HTTPMethod.POST,
instance_uri=self.dataset_uri,
params={
Expand Down
2 changes: 1 addition & 1 deletion client/starwhale/core/dataset/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def build(
).run()
else:
ds.build(**kwargs)
return dataset_uri
return ds.uri

@classmethod
def copy(
Expand Down
2 changes: 1 addition & 1 deletion client/tests/core/test_dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_signed_url_backend(
raw_content = string.ascii_lowercase.encode()
data_uri = "12345678abcdefg"
req_signed_url = rm.post(
"http://127.0.0.1:1234/api/v1/project/self/dataset/mnist/version/1122334455667788/sign-links",
"http://127.0.0.1:1234/api/v1/project/self/dataset/mnist/uri/sign-links",
json={"data": {data_uri: signed_url}},
)
req_file_download = rm.get(
Expand Down
2 changes: 1 addition & 1 deletion client/tests/sdk/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ def test_link_cloud(self, rm: Mocker) -> None:

rm.request(
HTTPMethod.POST,
"http://127.0.0.1:8081/api/v1/project/test/dataset/mnist/version/latest/sign-links?expTimeMillis=60000",
"http://127.0.0.1:8081/api/v1/project/test/dataset/mnist/uri/sign-links?expTimeMillis=60000",
json={
"data": {
"s3://minioadmin:minioadmin@10.131.0.1:9000/users/path/to/file": "http://127.0.0.1:9001/signed_url"
Expand Down
4 changes: 2 additions & 2 deletions client/tests/sdk/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def test_swds_bin_s3(

signed_url = "http://minio/signed/path/file"
rm.post(
"http://127.0.0.1:1234/api/v1/project/self/dataset/mnist/version/1122334455667788/sign-links",
"http://127.0.0.1:1234/api/v1/project/self/dataset/mnist/uri/sign-links",
json={"data": {fname: signed_url}},
)
rm.get(
Expand Down Expand Up @@ -637,7 +637,7 @@ def test_remote_batch_sign(
raw_content = b"abcdefg"
req_get_file = rm.register_uri(HTTPMethod.GET, "/get-file", content=raw_content)
rm.post(
"http://localhost/api/v1/project/x/dataset/mnist/version/1122/sign-links",
"http://localhost/api/v1/project/x/dataset/mnist/uri/sign-links",
json={"data": _uri_dict},
)
rm.get(
Expand Down
4 changes: 1 addition & 3 deletions console/packages/starwhale-core/src/datastore/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ export function tableDataLink(
Authorization?: string
}
) {
return `/api/v1/project/${projectId}/dataset/${datasetName}/version/${datasetVersionName}/link?${qs.stringify(
query
)}`
return `/api/v1/project/${projectId}/dataset/${datasetName}/uri?${qs.stringify(query)}`
}

export function tableNameOfDataset(projectName: string, datasetName: string, datasetVersionName: string) {
Expand Down
46 changes: 30 additions & 16 deletions scripts/client_test/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future

from cmds import DatasetExpl
from cmds.eval_cmd import Evaluation
from cmds.base.invoke import invoke
from cmds.project_cmd import Project
Expand Down Expand Up @@ -36,37 +37,42 @@
"mnist": {
"workdir": f"{ROOT_DIR}/example/mnist",
"datasets": [
"",
"mnist.dataset:RawDatasetProcessExecutor",
"mnist.dataset:LinkRawDatasetProcessExecutor",
DatasetExpl("mnist_bin", "mnist.dataset:iter_mnist_item"),
DatasetExpl(
"mnist_link_raw", "mnist.dataset:LinkRawDatasetProcessExecutor"
),
],
# "datasets": [""],
},
"cifar10": {
"workdir": f"{ROOT_DIR}/example/cifar10",
"datasets": [""],
"datasets": [DatasetExpl("cifar10", "")],
},
"nmt": {
"workdir": f"{ROOT_DIR}/example/nmt",
"datasets": [""],
"datasets": [DatasetExpl("nmt", "")],
},
"pfp": {
"workdir": f"{ROOT_DIR}/example/PennFudanPed",
"datasets": [""],
"datasets": [DatasetExpl("pfp", "")],
"device": "gpu",
},
"speech_command": {
"workdir": f"{ROOT_DIR}/example/speech_command",
"datasets": ["", "sc.dataset:LinkRawDatasetBuildExecutor"],
"datasets": [
DatasetExpl("speech_command", ""),
DatasetExpl(
"speech_command_link", "sc.dataset:LinkRawDatasetBuildExecutor"
),
],
"device": "gpu",
},
"ag_news": {
"workdir": f"{ROOT_DIR}/example/text_cls_AG_NEWS",
"datasets": [""],
"datasets": [DatasetExpl("ag_news", "")],
},
"ucf101": {
"workdir": f"{ROOT_DIR}/example/ucf101",
"datasets": [""],
"datasets": [DatasetExpl("ucf101", "")],
},
}
RUNTIMES: t.Dict[str, t.Dict[str, t.Union[str, t.List[str]]]] = {
Expand Down Expand Up @@ -124,14 +130,20 @@ def __init__(
}
)

def build_dataset(self, _workdir: str, handler: str = "") -> t.Any:
def build_dataset(self, _workdir: str, ds_expl: DatasetExpl) -> t.Any:
self.select_local_instance()
_uri = Dataset.build_with_api(workdir=_workdir, handler=handler)
ret_uri = Dataset.build_with_api(workdir=_workdir, ds_expl=ds_expl)
_uri = URI.capsulate_uri(
instance=ret_uri.instance,
project=ret_uri.project,
obj_type=ret_uri.object.typ,
obj_name=ret_uri.object.name,
obj_ver=ret_uri.object.version,
)
if self.server_url:
assert self.dataset_api.copy(
src_uri=_uri.full_uri,
target_project=f"cloud://server/project/{self.server_project}",
force=True,
)
dss_ = self.datasets.get(_uri.object.name, [])
dss_.append(_uri)
Expand Down Expand Up @@ -280,7 +292,9 @@ def test_simple(self) -> None:
_model_uri = self.build_model(f"{self._work_dir}/scripts/example")

# 2.dataset build
_ds_uri = self.build_dataset(f"{self._work_dir}/scripts/example")
_ds_uri = self.build_dataset(
f"{self._work_dir}/scripts/example", DatasetExpl("", "")
)

# 3.runtime build
_rt_uri = self.build_runtime(f"{self._work_dir}/scripts/example")
Expand Down Expand Up @@ -368,8 +382,8 @@ def test_expl(self, expl_name: str) -> None:
raise

# download data
for d_type in expl["datasets"]:
self.build_dataset(workdir_, d_type)
for ds_expl in expl["datasets"]:
self.build_dataset(workdir_, ds_expl)
self.build_model(workdir_)

# run_eval
Expand Down
3 changes: 3 additions & 0 deletions scripts/client_test/cmds/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
from collections import namedtuple

CLI = "swcli"
DatasetExpl = namedtuple("DatasetExpl", ["name", "handler"])
11 changes: 5 additions & 6 deletions scripts/client_test/cmds/artifacts_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from starwhale.core.runtime.view import RuntimeTermView
from starwhale.api._impl.data_store import LocalDataStore

from . import CLI
from . import CLI, DatasetExpl
from .base.invoke import invoke, invoke_with_react


Expand Down Expand Up @@ -208,14 +208,15 @@ def __init__(self) -> None:
@staticmethod
def build_with_api(
workdir: str,
ds_expl: DatasetExpl,
dataset_yaml: str = "dataset.yaml",
handler: str = "",
) -> t.Any:
yaml_path = Path(workdir) / dataset_yaml
config = DatasetConfig()
if yaml_path.exists():
config = DatasetConfig.create_by_yaml(yaml_path)
config.handler = import_object(workdir, handler or config.handler)
config.name = ds_expl.name or config.name
config.handler = import_object(workdir, ds_expl.handler or config.handler)
_uri = DatasetTermView.build(workdir, config)
LocalDataStore.get_instance().dump()
return _uri
Expand Down Expand Up @@ -264,10 +265,8 @@ def summary(self, uri: str) -> t.Any:
_ret_code, _res = invoke([CLI, "-o", "json", self.name, "summary", uri])
return json.loads(_res) if _ret_code == 0 else {}

def copy(self, src_uri: str, target_project: str, force: bool) -> bool:
def copy(self, src_uri: str, target_project: str) -> bool:
_args = [CLI, self.name, "copy", src_uri, target_project]
if force:
_args.append("--force")
_ret_code, _res = invoke(_args)
return bool(_ret_code == 0)

Expand Down
Loading

0 comments on commit 9b50838

Please sign in to comment.