Skip to content

Commit

Permalink
feat(dataset): MappingDatasetBuilder supports to upload swds_bin to…
Browse files Browse the repository at this point in the history
… cloud instance (#1996)

MappingDatasetBuilder supports to upload swds_bin to cloud instance directly
  • Loading branch information
tianweidut authored Mar 27, 2023
1 parent a7b40ad commit d346966
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
23 changes: 17 additions & 6 deletions client/starwhale/api/_impl/dataset/builder/mapping_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
from loguru import logger

from starwhale.consts import STANDALONE_INSTANCE
from starwhale.base.uri import URI
from starwhale.utils.fs import (
empty_dir,
ensure_dir,
blake2b_file,
BLAKE2B_SIGNATURE_ALGO,
)
from starwhale.utils.error import NoSupportError
from starwhale.base.type import URIType
from starwhale.base.cloud import CloudRequestMixed
from starwhale.utils.retry import http_retry
from starwhale.core.dataset.type import (
Link,
BaseArtifact,
Expand Down Expand Up @@ -311,11 +314,19 @@ def _handle_bin_sync(self, bin_path: Path) -> None:
uri, _ = DatasetStorage.save_data_file(bin_path, remove_src=True)
else:
sign_name = blake2b_file(bin_path)
# TODO: do upload to cloud with new dataset bin storage api
uri = sign_name
raise NoSupportError(
"no support upload bin files into cloud instance directly"
)
crm = CloudRequestMixed()
instance_uri = URI(self.instance_name, expected_type=URIType.INSTANCE)

@http_retry
def _upload() -> str:
r = crm.do_multipart_upload_file(
url_path=f"/project/{self.project_name}/dataset/{self.dataset_name}/hashedBlob/{sign_name}",
file_path=bin_path,
instance_uri=instance_uri,
)
return r.json()["data"] # type: ignore

uri = _upload()

self._signed_bins_meta.append(
MappingDatasetBuilder._SignedBinMeta(
Expand Down
30 changes: 22 additions & 8 deletions client/tests/sdk/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import os
import re
import copy
import json
import math
Expand Down Expand Up @@ -2130,19 +2131,27 @@ def test_close_empty(self) -> None:
project_name=self.project_name,
).close()

@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
@Mocker()
def test_put_for_cloud(self, rm: Mocker) -> None:
instance_uri = "http://1.1.1.1"
rm.request(
HTTPMethod.GET,
f"{instance_uri}/api/v1/project/self",
f"{instance_uri}/api/v1/project/{self.project_name}",
json={"data": {"id": 1, "name": "project"}},
)
update_req = rm.request(
HTTPMethod.POST, f"{instance_uri}/api/v1/datastore/updateTable"
)

server_return_uri = "__server-uri-path__"
upload_req = rm.register_uri(
HTTPMethod.POST,
re.compile(
f"{instance_uri}/api/v1/project/{self.project_name}/dataset/{self.dataset_name}/hashedBlob/",
),
json={"data": server_return_uri},
)

os.environ[SWEnv.instance_token] = "1234"

mdb = MappingDatasetBuilder(
Expand All @@ -2164,11 +2173,16 @@ def test_put_for_cloud(self, rm: Mocker) -> None:
mdb.put(DataRow(index=1, features={"bin": Binary(b"abc")}))
mdb.flush()

with self.assertRaisesRegex(
threading.ThreadError,
"no support upload bin files into cloud instance directly",
):
mdb.put(DataRow(index=1, features={"label": 1}))
mdb.close()
mdb.put(DataRow(index=1, features={"label": 1}))
mdb.close()

assert update_req.called
assert upload_req.call_count == 1

assert mdb._signed_bins_meta[0].name == server_return_uri
assert any(
[
server_return_uri in history.text
for history in update_req.request_history
]
)
18 changes: 14 additions & 4 deletions client/tests/sdk/test_dataset_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io
import os
import re
import sys
import typing as t
from http import HTTPStatus
Expand Down Expand Up @@ -656,6 +657,18 @@ def test_create_for_cloud(self, rm: Mocker) -> None:
json={"data": {"uploadId": "123"}},
)

rm.register_uri(
HTTPMethod.HEAD,
re.compile("http://1.1.1.1/api/v1/project/self/dataset/mnist/hashedBlob/"),
status_code=HTTPStatus.NOT_FOUND,
)

rm.register_uri(
HTTPMethod.POST,
re.compile("http://1.1.1.1/api/v1/project/self/dataset/mnist/hashedBlob/"),
json={"data": "uri"},
)

cnt = 10
for i in range(0, cnt):
ds.append(
Expand All @@ -678,10 +691,7 @@ def test_create_for_cloud(self, rm: Mocker) -> None:
assert file_request.call_count == 2

# TODO: when sdk supports to upload blobs into cloud, remove assertRasise
with self.assertRaisesRegex(
RuntimeError, "no support upload bin files into cloud instance directly"
):
ds.close()
ds.close()

@pytest.mark.skip(
"enable this test when datastore wrapper supports timestamp version"
Expand Down

0 comments on commit d346966

Please sign in to comment.