diff --git a/client/starwhale/api/_impl/dataset/builder/mapping_builder.py b/client/starwhale/api/_impl/dataset/builder/mapping_builder.py index 8f5a672fff..470b1e97be 100644 --- a/client/starwhale/api/_impl/dataset/builder/mapping_builder.py +++ b/client/starwhale/api/_impl/dataset/builder/mapping_builder.py @@ -15,13 +15,16 @@ from loguru import logger from starwhale.consts import STANDALONE_INSTANCE +from starwhale.base.uri import URI from starwhale.utils.fs import ( empty_dir, ensure_dir, blake2b_file, BLAKE2B_SIGNATURE_ALGO, ) -from starwhale.utils.error import NoSupportError +from starwhale.base.type import URIType +from starwhale.base.cloud import CloudRequestMixed +from starwhale.utils.retry import http_retry from starwhale.core.dataset.type import ( Link, BaseArtifact, @@ -311,11 +314,19 @@ def _handle_bin_sync(self, bin_path: Path) -> None: uri, _ = DatasetStorage.save_data_file(bin_path, remove_src=True) else: sign_name = blake2b_file(bin_path) - # TODO: do upload to cloud with new dataset bin storage api - uri = sign_name - raise NoSupportError( - "no support upload bin files into cloud instance directly" - ) + crm = CloudRequestMixed() + instance_uri = URI(self.instance_name, expected_type=URIType.INSTANCE) + + @http_retry + def _upload() -> str: + r = crm.do_multipart_upload_file( + url_path=f"/project/{self.project_name}/dataset/{self.dataset_name}/hashedBlob/{sign_name}", + file_path=bin_path, + instance_uri=instance_uri, + ) + return r.json()["data"] # type: ignore + + uri = _upload() self._signed_bins_meta.append( MappingDatasetBuilder._SignedBinMeta( diff --git a/client/tests/sdk/test_dataset.py b/client/tests/sdk/test_dataset.py index ee4234e9d0..0f80af9cbf 100644 --- a/client/tests/sdk/test_dataset.py +++ b/client/tests/sdk/test_dataset.py @@ -1,5 +1,6 @@ import io import os +import re import copy import json import math @@ -2130,19 +2131,27 @@ def test_close_empty(self) -> None: project_name=self.project_name, ).close() - @pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning") @Mocker() def test_put_for_cloud(self, rm: Mocker) -> None: instance_uri = "http://1.1.1.1" rm.request( HTTPMethod.GET, - f"{instance_uri}/api/v1/project/self", + f"{instance_uri}/api/v1/project/{self.project_name}", json={"data": {"id": 1, "name": "project"}}, ) update_req = rm.request( HTTPMethod.POST, f"{instance_uri}/api/v1/datastore/updateTable" ) + server_return_uri = "__server-uri-path__" + upload_req = rm.register_uri( + HTTPMethod.POST, + re.compile( + f"{instance_uri}/api/v1/project/{self.project_name}/dataset/{self.dataset_name}/hashedBlob/", + ), + json={"data": server_return_uri}, + ) + os.environ[SWEnv.instance_token] = "1234" mdb = MappingDatasetBuilder( @@ -2164,11 +2173,16 @@ def test_put_for_cloud(self, rm: Mocker) -> None: mdb.put(DataRow(index=1, features={"bin": Binary(b"abc")})) mdb.flush() - with self.assertRaisesRegex( - threading.ThreadError, - "no support upload bin files into cloud instance directly", - ): - mdb.put(DataRow(index=1, features={"label": 1})) - mdb.close() + mdb.put(DataRow(index=1, features={"label": 1})) + mdb.close() assert update_req.called + assert upload_req.call_count == 1 + + assert mdb._signed_bins_meta[0].name == server_return_uri + assert any( + [ + server_return_uri in history.text + for history in update_req.request_history + ] + ) diff --git a/client/tests/sdk/test_dataset_sdk.py b/client/tests/sdk/test_dataset_sdk.py index bbac30d4b2..22dd4d1dd9 100644 --- a/client/tests/sdk/test_dataset_sdk.py +++ b/client/tests/sdk/test_dataset_sdk.py @@ -2,6 +2,7 @@ import io import os +import re import sys import typing as t from http import HTTPStatus @@ -656,6 +657,18 @@ def test_create_for_cloud(self, rm: Mocker) -> None: json={"data": {"uploadId": "123"}}, ) + rm.register_uri( + HTTPMethod.HEAD, + re.compile("http://1.1.1.1/api/v1/project/self/dataset/mnist/hashedBlob/"), + status_code=HTTPStatus.NOT_FOUND, + ) + + rm.register_uri( + HTTPMethod.POST, + re.compile("http://1.1.1.1/api/v1/project/self/dataset/mnist/hashedBlob/"), + json={"data": "uri"}, + ) + cnt = 10 for i in range(0, cnt): ds.append( @@ -678,10 +691,7 @@ def test_create_for_cloud(self, rm: Mocker) -> None: assert file_request.call_count == 2 # TODO: when sdk supports to upload blobs into cloud, remove assertRasise - with self.assertRaisesRegex( - RuntimeError, "no support upload bin files into cloud instance directly" - ): - ds.close() + ds.close() @pytest.mark.skip( "enable this test when datastore wrapper supports timestamp version"