From c10794098b570b88de5966ef214655eafc4f1568 Mon Sep 17 00:00:00 2001 From: tianwei Date: Mon, 12 Dec 2022 13:32:52 +0800 Subject: [PATCH] support cloud remote dataset build by dataset sdk --- client/starwhale/api/_impl/dataset/builder.py | 28 +++- client/starwhale/api/_impl/dataset/model.py | 140 +++++++++++++----- client/starwhale/api/_impl/wrapper.py | 4 +- client/starwhale/base/bundle_copy.py | 45 ++++-- client/starwhale/core/dataset/copy.py | 70 +++++---- client/starwhale/core/dataset/model.py | 22 ++- client/starwhale/core/dataset/tabular.py | 6 +- client/starwhale/core/model/copy.py | 3 + client/tests/sdk/test_dataset_sdk.py | 88 ++++++++++- 9 files changed, 307 insertions(+), 99 deletions(-) diff --git a/client/starwhale/api/_impl/dataset/builder.py b/client/starwhale/api/_impl/dataset/builder.py index ef7aa56c7a..33d087e68f 100644 --- a/client/starwhale/api/_impl/dataset/builder.py +++ b/client/starwhale/api/_impl/dataset/builder.py @@ -16,10 +16,20 @@ import jsonlines from loguru import logger -from starwhale.consts import AUTH_ENV_FNAME, DEFAULT_PROJECT, SWDS_DATA_FNAME_FMT +from starwhale.consts import ( + AUTH_ENV_FNAME, + DEFAULT_PROJECT, + STANDALONE_INSTANCE, + SWDS_DATA_FNAME_FMT, +) from starwhale.base.uri import URI from starwhale.utils.fs import empty_dir, ensure_dir -from starwhale.base.type import DataFormatType, DataOriginType, ObjectStoreType +from starwhale.base.type import ( + InstanceType, + DataFormatType, + DataOriginType, + ObjectStoreType, +) from starwhale.utils.error import FormatError, NoSupportError from starwhale.core.dataset import model from starwhale.core.dataset.type import ( @@ -61,6 +71,7 @@ def __init__( append_from_version: str = "", append_from_uri: t.Optional[URI] = None, data_mime_type: MIMEType = MIMEType.UNDEFINED, + instance_name: str = STANDALONE_INSTANCE, ) -> None: # TODO: add more docstring for args # TODO: validate group upper and lower? @@ -80,11 +91,20 @@ def __init__( self.dataset_name = dataset_name self.dataset_version = dataset_version self.tabular_dataset = TabularDataset( - dataset_name, dataset_version, project_name + dataset_name, + dataset_version, + project_name, + instance_name=instance_name, ) self._forked_summary: t.Optional[DatasetSummary] if append and append_from_uri: + # TODOļ¼š controller supports cloud dataset fork api + if append_from_uri.instance_type == InstanceType.CLOUD: + raise NoSupportError( + f"Can't build dataset from existed cloud dataset: {append_from_uri}" + ) + self._forked_last_seq_id, self._forked_rows = self.tabular_dataset.fork( append_from_version ) @@ -546,6 +566,7 @@ def __init__( append_from_version: str = "", append_from_uri: t.Optional[URI] = None, append_with_swds_bin: bool = True, + instance_name: str = STANDALONE_INSTANCE, ) -> None: super().__init__( name=f"RowWriter-{dataset_name}-{dataset_version}-{project_name}" @@ -561,6 +582,7 @@ def __init__( "append": append, "append_from_version": append_from_version, "append_from_uri": append_from_uri, + "instance_name": instance_name, } self._queue: queue.Queue[t.Optional[DataRow]] = queue.Queue() diff --git a/client/starwhale/api/_impl/dataset/model.py b/client/starwhale/api/_impl/dataset/model.py index 15da09b024..75c923cdb6 100644 --- a/client/starwhale/api/_impl/dataset/model.py +++ b/client/starwhale/api/_impl/dataset/model.py @@ -6,12 +6,19 @@ from types import TracebackType from pathlib import Path from functools import wraps +from contextlib import ExitStack from loguru import logger from starwhale.utils import gen_uniq_version -from starwhale.consts import HTTPMethod, DEFAULT_PAGE_IDX, DEFAULT_PAGE_SIZE +from starwhale.consts import ( + HTTPMethod, + DEFAULT_PAGE_IDX, + DEFAULT_PAGE_SIZE, + STANDALONE_INSTANCE, +) from starwhale.base.uri import URI, URIType +from starwhale.utils.fs import move_dir, empty_dir from starwhale.base.type import InstanceType from starwhale.base.cloud import CloudRequestMixed from starwhale.utils.error import ExistedError, NotFoundError, NoSupportError @@ -102,8 +109,15 @@ def __init__( self._append_use_swds_bin = False _summary = None - if self._check_uri_exists(_origin_uri): + origin_uri_exists = self._check_uri_exists(_origin_uri) + if origin_uri_exists: if create: + # TODO: support build cloud dataset from the existed dataset + if self.project_uri.instance_type == InstanceType.CLOUD: + raise NoSupportError( + f"Can't build dataset from the existed cloud dataset uri:{_origin_uri}" + ) + self._append_from_version = version self._create_by_append = True self._fork_dataset() @@ -122,7 +136,14 @@ def __init__( else: raise ExistedError(f"{self.uri} was not found fo load") - self._summary = _summary or self.__core_dataset.summary() + self._summary: t.Optional[DatasetSummary] + if _summary: + self._summary = _summary + else: + if origin_uri_exists: + self._summary = self.__core_dataset.summary() + else: + self._summary = DatasetSummary() self._rows_cnt = self._summary.rows if self._summary else 0 self._consumption: t.Optional[TabularDatasetSessionConsumption] = None @@ -139,10 +160,21 @@ def __init__( def _fork_dataset(self) -> None: # TODO: support cloud dataset prepare in the tmp dir # TODO: lazy fork dataset - self.__core_dataset._prepare_snapshot() - self.__core_dataset._fork_swds( - self._create_by_append, self._append_from_version - ) + if not isinstance(self.__core_dataset, StandaloneDataset): + raise NoSupportError( + f"only support standalone dataset fork: {self.__core_dataset}" + ) + + def _when_exit() -> None: + self.__core_dataset.store.building = False + + with ExitStack() as stack: + stack.callback(_when_exit) + self.__core_dataset.store.building = True + self.__core_dataset._prepare_snapshot() + self.__core_dataset._fork_swds( + self._create_by_append, self._append_from_version + ) def _auto_complete_version(self, version: str) -> str: version = version.strip() @@ -205,6 +237,10 @@ def make_distributed_consumption( ) return self + def _clear_data_loader(self) -> None: + with self._lock: + self.__data_loaders = {} + def _get_data_loader( self, recreate: bool = False, disable_consumption: bool = False ) -> DataLoader: @@ -272,6 +308,7 @@ def _run() -> _GItemType: return _run() except RuntimeError as e: if str(e).startswith("table is empty"): + self._clear_data_loader() return None raise except StopIteration: @@ -340,7 +377,7 @@ def _check_uri_exists(uri: t.Optional[URI]) -> bool: if uri.instance_type == InstanceType.CLOUD: crm = CloudRequestMixed() ok, _ = crm.do_http_request_simple_ret( - path=f"/project/{uri.project}/{URIType.DATASET}/{uri.object.name}/version/{uri.object.version}/file", + path=f"/project/{uri.project}/{URIType.DATASET}/{uri.object.name}/version/{uri.object.version}", method=HTTPMethod.HEAD, instance_uri=uri, ignore_status_codes=[HTTPStatus.NOT_FOUND], @@ -431,11 +468,6 @@ def __setitem__( ) -> None: # TODO: tune the performance of getitem by cache self._trigger_icode_build = True - if not isinstance(self.__core_dataset, StandaloneDataset): - raise NoSupportError( - f"setitem only supports for standalone dataset: {self.__core_dataset}" - ) - _row_writer = self._get_row_writer() if not isinstance(key, (int, str)): @@ -487,20 +519,16 @@ def _get_row_writer(self) -> RowWriter: append_from_version = "" # TODO: support alignment_bytes_size, volume_bytes_size arguments - if not isinstance(self.__core_dataset, StandaloneDataset): - raise NoSupportError( - f"setitem only supports for standalone dataset: {self.__core_dataset}" - ) - self._row_writer = RowWriter( dataset_name=self.name, dataset_version=self.version, project_name=self.project_uri.project, - workdir=self.__core_dataset.store.snapshot_workdir, # TODO: use tmpdir which is same as dataset build command + workdir=self.__core_dataset.store.tmp_dir, append=self._create_by_append, append_from_version=append_from_version, append_from_uri=append_from_uri, append_with_swds_bin=self._append_use_swds_bin, + instance_name=self.project_uri.instance, ) return self._row_writer @@ -573,28 +601,66 @@ def build_with_copy_src( @_check_readonly @_forbid_handler_build def _do_build_from_interactive_code(self) -> None: - ds = self.__core_dataset - if isinstance(ds, StandaloneDataset): - if self._row_writer is None: - raise RuntimeError("row writer is none, no data was written") + if self._row_writer is None: + raise RuntimeError("row writer is none, no data was written") - self.flush() - self._row_writer.close() - # TODO: use the elegant method to refactor manifest update - self._summary = self._row_writer.summary - self._summary.rows = len(self) - ds._manifest["dataset_summary"] = self._summary.asdict() - ds._calculate_signature() - ds._render_manifest() - ds._make_swds_meta_tar() - ds._make_auto_tags() + self.flush() + self._row_writer.close() + self._summary = self._row_writer.summary + + # TODO: use the elegant method to refactor manifest update + self._summary.rows = len(self) + + if isinstance(self.__core_dataset, StandaloneDataset): + local_ds = self.__core_dataset + local_uri = self.uri else: - # TODO: support cloud dataset build - raise NoSupportError("only support standalone dataset build") + local_uri = URI.capsulate_uri( + instance=STANDALONE_INSTANCE, + project=self.uri.project, + obj_type=self.uri.object.typ, + obj_name=self.uri.object.name, + obj_ver=self.uri.object.version, + ) + local_ds = StandaloneDataset(local_uri) + local_ds.store._tmp_dir = self.__core_dataset.store.tmp_dir + setattr(local_ds, "_version", self.version) + + def _when_standalone_exit() -> None: + local_ds._make_auto_tags() + move_dir(local_ds.store.tmp_dir, local_ds.store.snapshot_workdir) + + def _when_cloud_exit() -> None: + from starwhale.core.dataset.copy import DatasetCopy + + dc = DatasetCopy( + str(local_uri), str(self.uri), URIType.DATASET + ).with_disable_datastore() + dc._do_upload_bundle_dir(workdir=local_ds.store.tmp_dir) + empty_dir(local_ds.store.tmp_dir) + + def _when_exit() -> None: + local_ds.store.building = False + if isinstance(self.__core_dataset, StandaloneDataset): + _when_standalone_exit() + else: + _when_cloud_exit() + + with ExitStack() as stack: + stack.callback(_when_exit) + local_ds.store.building = True + local_ds._manifest["dataset_summary"] = self._summary.asdict() + local_ds._calculate_signature() + local_ds._render_manifest() + local_ds._make_swds_meta_tar() @_check_readonly @_forbid_icode_build def _do_build_from_handler(self) -> None: + # TODO: support build dataset for cloud uri directly + if self.project_uri.instance_type == InstanceType.CLOUD: + raise NoSupportError("no support to build cloud dataset directly") + self._trigger_icode_build = True config = DatasetConfig( name=self.name, @@ -620,10 +686,6 @@ def _do_build_from_handler(self) -> None: @_check_readonly def build(self) -> None: - # TODO: support build dataset for cloud uri directly - if self.project_uri.instance_type == InstanceType.CLOUD: - raise NoSupportError("no support to build cloud dataset directly") - if self._trigger_icode_build: self._do_build_from_interactive_code() elif self._trigger_handler_build and self.build_handler: diff --git a/client/starwhale/api/_impl/wrapper.py b/client/starwhale/api/_impl/wrapper.py index bd04bb5102..e46f1641ff 100644 --- a/client/starwhale/api/_impl/wrapper.py +++ b/client/starwhale/api/_impl/wrapper.py @@ -161,7 +161,7 @@ def flush(self, table_name: str) -> None: class Dataset(Logger): def __init__( - self, dataset_id: str, project: str, instance_uri: str = "", token: str = "" + self, dataset_id: str, project: str, instance_name: str = "", token: str = "" ) -> None: if not dataset_id: raise RuntimeError("id should not be None") @@ -172,7 +172,7 @@ def __init__( self.dataset_id = dataset_id self.project = project self._meta_table_name = f"project/{self.project}/dataset/{self.dataset_id}/meta" - self._data_store = data_store.get_data_store(instance_uri, token) + self._data_store = data_store.get_data_store(instance_name, token) self._init_writers([self._meta_table_name]) def put(self, data_id: Union[str, int], **kwargs: Any) -> None: diff --git a/client/starwhale/base/bundle_copy.py b/client/starwhale/base/bundle_copy.py index a4c5d77620..ca106b3bac 100644 --- a/client/starwhale/base/bundle_copy.py +++ b/client/starwhale/base/bundle_copy.py @@ -302,15 +302,19 @@ def _download(_tid: TaskID, fd: FileDesc) -> None: def _do_ubd_bundle_prepare( self, - progress: Progress, + progress: t.Optional[Progress], workdir: Path, url_path: str, ) -> t.Any: manifest_path = workdir / DEFAULT_MANIFEST_NAME - task_id = progress.add_task( - f":arrow_up: {manifest_path.name}", - total=manifest_path.stat().st_size, - ) + if progress is None: + task_id = TaskID(0) + else: + task_id = progress.add_task( + f":arrow_up: {manifest_path.name}", + total=manifest_path.stat().st_size, + ) + # TODO: use rich progress r = self.do_multipart_upload_file( url_path=url_path, @@ -331,7 +335,7 @@ def _do_ubd_bundle_prepare( def _do_ubd_blobs( self, - progress: Progress, + progress: t.Optional[Progress], workdir: Path, upload_id: str, url_path: str, @@ -348,7 +352,9 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None: _upload_headers["X-SW-UPLOAD-TYPE"] = fd.file_type.name _upload_headers["X-SW-UPLOAD-OBJECT-HASH"] = fd.signature - progress.update(_tid, visible=True) + if progress is not None: + progress.update(_tid, visible=True) + self.do_multipart_upload_file( url_path=url_path, file_path=fd.path, @@ -364,14 +370,18 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None: ) _p_map = {} - for _f in self.upload_files(workdir=workdir): + for _id, _f in enumerate(self.upload_files(workdir=workdir)): if existed_files and _f.signature in existed_files: continue - _tid = progress.add_task( - f":arrow_up: {_f.path.name}", - total=float(_f.size), - visible=False, - ) + + if progress is None: + _tid = TaskID(_id) + else: + _tid = progress.add_task( + f":arrow_up: {_f.path.name}", + total=float(_f.size), + visible=False, + ) _p_map[_tid] = _f with ThreadPoolExecutor( @@ -383,6 +393,9 @@ def _upload_blob(_tid: TaskID, fd: FileDesc) -> None: ] wait(futures) + def _do_ubd_datastore(self) -> None: + raise NotImplementedError + def _do_ubd_end(self, upload_id: str, url_path: str, ok: bool) -> None: phase = _UploadPhase.END if ok else _UploadPhase.CANCEL self.do_http_request( @@ -401,9 +414,10 @@ def _do_ubd_end(self, upload_id: str, url_path: str, ok: bool) -> None: def _do_upload_bundle_dir( self, - progress: Progress, + progress: t.Optional[Progress] = None, + workdir: t.Optional[Path] = None, ) -> None: - workdir: Path = self._get_target_path(self.src_uri) + workdir = workdir or self._get_target_path(self.src_uri) url_path = self._get_remote_instance_rc_url() res_data = self._do_ubd_bundle_prepare( @@ -416,6 +430,7 @@ def _do_upload_bundle_dir( raise Exception("upload_id is empty") exists_files: list = res_data.get("existed", []) try: + self._do_ubd_datastore() self._do_ubd_blobs( progress=progress, workdir=workdir, diff --git a/client/starwhale/core/dataset/copy.py b/client/starwhale/core/dataset/copy.py index f2ed0dc8a5..e5b06426c5 100644 --- a/client/starwhale/core/dataset/copy.py +++ b/client/starwhale/core/dataset/copy.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import os -from typing import List, Iterator, Optional +from typing import Iterator from pathlib import Path from rich.progress import Progress @@ -20,6 +22,18 @@ class DatasetCopy(BundleCopy): + def with_disable_datastore(self) -> DatasetCopy: + self._disable_datastore = True + return self + + def with_enable_datastore(self) -> DatasetCopy: + self._disable_datastore = False + return self + + @property + def datastore_disabled(self) -> bool: + return getattr(self, "_disable_datastore", False) + def upload_files(self, workdir: Path) -> Iterator[FileDesc]: _manifest = load_yaml(workdir / DEFAULT_MANIFEST_NAME) for _k in _manifest["signature"]: @@ -79,24 +93,20 @@ def download_files(self, workdir: Path) -> Iterator[FileDesc]: file_type=FileType.SRC_TAR, ) - def _do_ubd_blobs( - self, - progress: Progress, - workdir: Path, - upload_id: str, - url_path: str, - existed_files: Optional[List] = None, - ) -> None: + def _do_ubd_datastore(self) -> None: + if self.datastore_disabled: + return + with TabularDataset( name=self.bundle_name, version=self.bundle_version, project=self.src_uri.project, - instance_uri=STANDALONE_INSTANCE, + instance_name=STANDALONE_INSTANCE, ) as local, TabularDataset( name=self.bundle_name, version=self.bundle_version, project=self.dest_uri.project, - instance_uri=self.dest_uri.instance, + instance_name=self.dest_uri.instance, ) as remote: console.print( f":bear_face: dump dataset meta from standalone to cloud({remote._ds_wrapper._meta_table_name})" @@ -105,26 +115,24 @@ def _do_ubd_blobs( for row in local.scan(): remote.put(row) - super()._do_ubd_blobs(progress, workdir, upload_id, url_path, existed_files) - def _do_download_bundle_dir(self, progress: Progress) -> None: - - with TabularDataset( - name=self.bundle_name, - version=self.bundle_version, - project=self.dest_uri.project, - instance_uri=STANDALONE_INSTANCE, - ) as local, TabularDataset( - name=self.bundle_name, - version=self.bundle_version, - project=self.src_uri.project, - instance_uri=self.src_uri.instance, - ) as remote: - console.print( - f":bird: load dataset meta from cloud({remote._ds_wrapper._meta_table_name}) to standalone" - ) - # TODO: add progressbar - for row in remote.scan(): - local.put(row) + if not self.datastore_disabled: + with TabularDataset( + name=self.bundle_name, + version=self.bundle_version, + project=self.dest_uri.project, + instance_name=STANDALONE_INSTANCE, + ) as local, TabularDataset( + name=self.bundle_name, + version=self.bundle_version, + project=self.src_uri.project, + instance_name=self.src_uri.instance, + ) as remote: + console.print( + f":bird: load dataset meta from cloud({remote._ds_wrapper._meta_table_name}) to standalone" + ) + # TODO: add progressbar + for row in remote.scan(): + local.put(row) super()._do_download_bundle_dir(progress) diff --git a/client/starwhale/core/dataset/model.py b/client/starwhale/core/dataset/model.py index 24833d3b62..3a55b6a223 100644 --- a/client/starwhale/core/dataset/model.py +++ b/client/starwhale/core/dataset/model.py @@ -4,6 +4,7 @@ import inspect import tarfile from abc import ABCMeta, abstractmethod +from http import HTTPStatus from pathlib import Path from collections import defaultdict @@ -39,6 +40,10 @@ class Dataset(BaseBundle, metaclass=ABCMeta): + def __init__(self, uri: URI) -> None: + self.store = DatasetStorage(uri) + super().__init__(uri) + def __str__(self) -> str: return f"Starwhale Dataset: {self.uri}" @@ -125,7 +130,6 @@ class StandaloneDataset(Dataset, LocalStorageBundleMixin): def __init__(self, uri: URI) -> None: super().__init__(uri) self.typ = InstanceType.STANDALONE - self.store = DatasetStorage(uri) self.tag = StandaloneTag(uri) self._manifest: t.Dict[ str, t.Any @@ -522,13 +526,23 @@ def list( return crm._fetch_bundle_all_list(project_uri, URIType.DATASET, page, size) def summary(self) -> t.Optional[DatasetSummary]: - r = self.do_http_request( + resp = self.do_http_request( f"/project/{self.uri.project}/{self.uri.object.typ}/{self.uri.object.name}", method=HTTPMethod.GET, instance_uri=self.uri, params={"versionUrl": self.uri.object.version}, - ).json() - _manifest: t.Dict[str, t.Any] = yaml.safe_load(r["data"].get("versionMeta", {})) + ignore_status_codes=[ + HTTPStatus.NOT_FOUND, + HTTPStatus.INTERNAL_SERVER_ERROR, + ], + ) + if resp.status_code != HTTPStatus.OK: + return None + + content = resp.json() + _manifest: t.Dict[str, t.Any] = yaml.safe_load( + content["data"].get("versionMeta", {}) + ) _summary = _manifest.get("dataset_summary", {}) return DatasetSummary(**_summary) if _summary else None diff --git a/client/starwhale/core/dataset/tabular.py b/client/starwhale/core/dataset/tabular.py index 122bc76738..32e9bcd00e 100644 --- a/client/starwhale/core/dataset/tabular.py +++ b/client/starwhale/core/dataset/tabular.py @@ -176,7 +176,7 @@ def __init__( project: str, start: t.Optional[t.Any] = None, end: t.Optional[t.Any] = None, - instance_uri: str = "", + instance_name: str = "", token: str = "", ) -> None: self.name = name @@ -184,7 +184,7 @@ def __init__( self.project = project self.table_name = f"{name}/{version[:VERSION_PREFIX_CNT]}/{version}" self._ds_wrapper = DatastoreWrapperDataset( - self.table_name, project, instance_uri=instance_uri, token=token + self.table_name, project, instance_name=instance_name, token=token ) self.start = start @@ -306,7 +306,7 @@ def from_uri( uri.project, start=start, end=end, - instance_uri=uri.instance, + instance_name=uri.instance, ) diff --git a/client/starwhale/core/model/copy.py b/client/starwhale/core/model/copy.py index 2a6b15547d..8b0b5e0cfa 100644 --- a/client/starwhale/core/model/copy.py +++ b/client/starwhale/core/model/copy.py @@ -68,3 +68,6 @@ def download_files(self, workdir: Path) -> Iterator[FileDesc]: # Path(workdir / _m["path"]).symlink_to( # _dest # the unify dir # ) + + def _do_ubd_datastore(self) -> None: + ... diff --git a/client/tests/sdk/test_dataset_sdk.py b/client/tests/sdk/test_dataset_sdk.py index 06a66d6127..04fa160a3b 100644 --- a/client/tests/sdk/test_dataset_sdk.py +++ b/client/tests/sdk/test_dataset_sdk.py @@ -575,7 +575,7 @@ def test_tags(self) -> None: def test_cloud_init(self, rm: Mocker) -> None: rm.request( HTTPMethod.HEAD, - "http://1.1.1.1/api/v1/project/self/dataset/not_found/version/1234/file", + "http://1.1.1.1/api/v1/project/self/dataset/not_found/version/1234", json={"message": "not found"}, status_code=HTTPStatus.NOT_FOUND, ) @@ -585,7 +585,7 @@ def test_cloud_init(self, rm: Mocker) -> None: rm.request( HTTPMethod.HEAD, - "http://1.1.1.1/api/v1/project/self/dataset/mnist/version/1234/file", + "http://1.1.1.1/api/v1/project/self/dataset/mnist/version/1234", json={"message": "existed"}, status_code=HTTPStatus.OK, ) @@ -609,6 +609,90 @@ def test_cloud_init(self, rm: Mocker) -> None: assert _summary is not None assert _summary.rows == 101 + @Mocker() + def test_cloud_build_from_icode(self, rm: Mocker) -> None: + sw = SWCliConfigMixed() + sw.update_instance( + uri="http://1.1.1.1", user_name="test", sw_token="123", alias="test" + ) + + manifest_req = rm.request( + HTTPMethod.GET, + "http://1.1.1.1/api/v1/project/self/dataset/mnist", + status_code=HTTPStatus.NOT_FOUND, + ) + + scan_table_req = rm.request( + HTTPMethod.POST, + "http://1.1.1.1/api/v1/datastore/scanTable", + json={"data": {}}, + ) + + update_table_req = rm.request( + HTTPMethod.POST, + "http://1.1.1.1/api/v1/datastore/updateTable", + ) + + ds = dataset("http://1.1.1.1/project/self/dataset/mnist", create=True) + assert manifest_req.call_count == 0 + + upload_file_req = rm.request( + HTTPMethod.POST, + f"http://1.1.1.1/api/v1/project/self/dataset/mnist/version/{ds.version}/file", + json={"data": {"upload_id": "123"}}, + ) + + _store = ds._Dataset__core_dataset.store # type: ignore + tmp_dir = _store.tmp_dir + snapshot_workdir = _store.snapshot_workdir + + cnt = 10 + for i in range(0, cnt): + ds.append( + DataRow( + index=i, data=Binary(f"data-{i}".encode()), annotations={"label": i} + ) + ) + + assert scan_table_req.call_count == cnt + + ds.flush() + assert tmp_dir.exists() + assert len(list(tmp_dir.iterdir())) != 0 + assert not snapshot_workdir.exists() + assert update_table_req.called + assert not upload_file_req.called + + ds.commit() + assert not tmp_dir.exists() + assert not snapshot_workdir.exists() + assert upload_file_req.called + + ds.close() + + @Mocker() + def test_cloud_build_no_support(self, rm: Mocker) -> None: + with self.assertRaisesRegex( + NoSupportError, "no support to build cloud dataset directly" + ): + ds = dataset("http://1.1.1.1/project/self/dataset/mnist", create=True) + ds.build_handler = MagicMock() + ds.commit() + + rm.request( + HTTPMethod.HEAD, + "http://1.1.1.1/api/v1/project/self/dataset/mnist/version/1234", + json={"message": "existed"}, + status_code=HTTPStatus.OK, + ) + + with self.assertRaisesRegex( + NoSupportError, "Can't build dataset from the existed cloud dataset uri" + ): + dataset( + "http://1.1.1.1/project/self/dataset/mnist/version/1234", create=True + ) + def test_consumption(self) -> None: existed_ds_uri = self._init_simple_dataset_with_str_id() ds = dataset(existed_ds_uri)