Skip to content

Commit

Permalink
support cloud remote dataset build by dataset sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
tianweidut committed Dec 13, 2022
1 parent 1f67118 commit c107940
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 99 deletions.
28 changes: 25 additions & 3 deletions client/starwhale/api/_impl/dataset/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@
import jsonlines
from loguru import logger

from starwhale.consts import AUTH_ENV_FNAME, DEFAULT_PROJECT, SWDS_DATA_FNAME_FMT
from starwhale.consts import (
AUTH_ENV_FNAME,
DEFAULT_PROJECT,
STANDALONE_INSTANCE,
SWDS_DATA_FNAME_FMT,
)
from starwhale.base.uri import URI
from starwhale.utils.fs import empty_dir, ensure_dir
from starwhale.base.type import DataFormatType, DataOriginType, ObjectStoreType
from starwhale.base.type import (
InstanceType,
DataFormatType,
DataOriginType,
ObjectStoreType,
)
from starwhale.utils.error import FormatError, NoSupportError
from starwhale.core.dataset import model
from starwhale.core.dataset.type import (
Expand Down Expand Up @@ -61,6 +71,7 @@ def __init__(
append_from_version: str = "",
append_from_uri: t.Optional[URI] = None,
data_mime_type: MIMEType = MIMEType.UNDEFINED,
instance_name: str = STANDALONE_INSTANCE,
) -> None:
# TODO: add more docstring for args
# TODO: validate group upper and lower?
Expand All @@ -80,11 +91,20 @@ def __init__(
self.dataset_name = dataset_name
self.dataset_version = dataset_version
self.tabular_dataset = TabularDataset(
dataset_name, dataset_version, project_name
dataset_name,
dataset_version,
project_name,
instance_name=instance_name,
)

self._forked_summary: t.Optional[DatasetSummary]
if append and append_from_uri:
# TODO: controller supports cloud dataset fork api
if append_from_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build dataset from existed cloud dataset: {append_from_uri}"
)

self._forked_last_seq_id, self._forked_rows = self.tabular_dataset.fork(
append_from_version
)
Expand Down Expand Up @@ -546,6 +566,7 @@ def __init__(
append_from_version: str = "",
append_from_uri: t.Optional[URI] = None,
append_with_swds_bin: bool = True,
instance_name: str = STANDALONE_INSTANCE,
) -> None:
super().__init__(
name=f"RowWriter-{dataset_name}-{dataset_version}-{project_name}"
Expand All @@ -561,6 +582,7 @@ def __init__(
"append": append,
"append_from_version": append_from_version,
"append_from_uri": append_from_uri,
"instance_name": instance_name,
}

self._queue: queue.Queue[t.Optional[DataRow]] = queue.Queue()
Expand Down
140 changes: 101 additions & 39 deletions client/starwhale/api/_impl/dataset/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
from types import TracebackType
from pathlib import Path
from functools import wraps
from contextlib import ExitStack

from loguru import logger

from starwhale.utils import gen_uniq_version
from starwhale.consts import HTTPMethod, DEFAULT_PAGE_IDX, DEFAULT_PAGE_SIZE
from starwhale.consts import (
HTTPMethod,
DEFAULT_PAGE_IDX,
DEFAULT_PAGE_SIZE,
STANDALONE_INSTANCE,
)
from starwhale.base.uri import URI, URIType
from starwhale.utils.fs import move_dir, empty_dir
from starwhale.base.type import InstanceType
from starwhale.base.cloud import CloudRequestMixed
from starwhale.utils.error import ExistedError, NotFoundError, NoSupportError
Expand Down Expand Up @@ -102,8 +109,15 @@ def __init__(

self._append_use_swds_bin = False
_summary = None
if self._check_uri_exists(_origin_uri):
origin_uri_exists = self._check_uri_exists(_origin_uri)
if origin_uri_exists:
if create:
# TODO: support build cloud dataset from the existed dataset
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build dataset from the existed cloud dataset uri:{_origin_uri}"
)

self._append_from_version = version
self._create_by_append = True
self._fork_dataset()
Expand All @@ -122,7 +136,14 @@ def __init__(
else:
raise ExistedError(f"{self.uri} was not found fo load")

self._summary = _summary or self.__core_dataset.summary()
self._summary: t.Optional[DatasetSummary]
if _summary:
self._summary = _summary
else:
if origin_uri_exists:
self._summary = self.__core_dataset.summary()
else:
self._summary = DatasetSummary()

self._rows_cnt = self._summary.rows if self._summary else 0
self._consumption: t.Optional[TabularDatasetSessionConsumption] = None
Expand All @@ -139,10 +160,21 @@ def __init__(
def _fork_dataset(self) -> None:
# TODO: support cloud dataset prepare in the tmp dir
# TODO: lazy fork dataset
self.__core_dataset._prepare_snapshot()
self.__core_dataset._fork_swds(
self._create_by_append, self._append_from_version
)
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"only support standalone dataset fork: {self.__core_dataset}"
)

def _when_exit() -> None:
self.__core_dataset.store.building = False

with ExitStack() as stack:
stack.callback(_when_exit)
self.__core_dataset.store.building = True
self.__core_dataset._prepare_snapshot()
self.__core_dataset._fork_swds(
self._create_by_append, self._append_from_version
)

def _auto_complete_version(self, version: str) -> str:
version = version.strip()
Expand Down Expand Up @@ -205,6 +237,10 @@ def make_distributed_consumption(
)
return self

def _clear_data_loader(self) -> None:
with self._lock:
self.__data_loaders = {}

def _get_data_loader(
self, recreate: bool = False, disable_consumption: bool = False
) -> DataLoader:
Expand Down Expand Up @@ -272,6 +308,7 @@ def _run() -> _GItemType:
return _run()
except RuntimeError as e:
if str(e).startswith("table is empty"):
self._clear_data_loader()
return None
raise
except StopIteration:
Expand Down Expand Up @@ -340,7 +377,7 @@ def _check_uri_exists(uri: t.Optional[URI]) -> bool:
if uri.instance_type == InstanceType.CLOUD:
crm = CloudRequestMixed()
ok, _ = crm.do_http_request_simple_ret(
path=f"/project/{uri.project}/{URIType.DATASET}/{uri.object.name}/version/{uri.object.version}/file",
path=f"/project/{uri.project}/{URIType.DATASET}/{uri.object.name}/version/{uri.object.version}",
method=HTTPMethod.HEAD,
instance_uri=uri,
ignore_status_codes=[HTTPStatus.NOT_FOUND],
Expand Down Expand Up @@ -431,11 +468,6 @@ def __setitem__(
) -> None:
# TODO: tune the performance of getitem by cache
self._trigger_icode_build = True
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"setitem only supports for standalone dataset: {self.__core_dataset}"
)

_row_writer = self._get_row_writer()

if not isinstance(key, (int, str)):
Expand Down Expand Up @@ -487,20 +519,16 @@ def _get_row_writer(self) -> RowWriter:
append_from_version = ""

# TODO: support alignment_bytes_size, volume_bytes_size arguments
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"setitem only supports for standalone dataset: {self.__core_dataset}"
)

self._row_writer = RowWriter(
dataset_name=self.name,
dataset_version=self.version,
project_name=self.project_uri.project,
workdir=self.__core_dataset.store.snapshot_workdir, # TODO: use tmpdir which is same as dataset build command
workdir=self.__core_dataset.store.tmp_dir,
append=self._create_by_append,
append_from_version=append_from_version,
append_from_uri=append_from_uri,
append_with_swds_bin=self._append_use_swds_bin,
instance_name=self.project_uri.instance,
)
return self._row_writer

Expand Down Expand Up @@ -573,28 +601,66 @@ def build_with_copy_src(
@_check_readonly
@_forbid_handler_build
def _do_build_from_interactive_code(self) -> None:
ds = self.__core_dataset
if isinstance(ds, StandaloneDataset):
if self._row_writer is None:
raise RuntimeError("row writer is none, no data was written")
if self._row_writer is None:
raise RuntimeError("row writer is none, no data was written")

self.flush()
self._row_writer.close()
# TODO: use the elegant method to refactor manifest update
self._summary = self._row_writer.summary
self._summary.rows = len(self)
ds._manifest["dataset_summary"] = self._summary.asdict()
ds._calculate_signature()
ds._render_manifest()
ds._make_swds_meta_tar()
ds._make_auto_tags()
self.flush()
self._row_writer.close()
self._summary = self._row_writer.summary

# TODO: use the elegant method to refactor manifest update
self._summary.rows = len(self)

if isinstance(self.__core_dataset, StandaloneDataset):
local_ds = self.__core_dataset
local_uri = self.uri
else:
# TODO: support cloud dataset build
raise NoSupportError("only support standalone dataset build")
local_uri = URI.capsulate_uri(
instance=STANDALONE_INSTANCE,
project=self.uri.project,
obj_type=self.uri.object.typ,
obj_name=self.uri.object.name,
obj_ver=self.uri.object.version,
)
local_ds = StandaloneDataset(local_uri)
local_ds.store._tmp_dir = self.__core_dataset.store.tmp_dir
setattr(local_ds, "_version", self.version)

def _when_standalone_exit() -> None:
local_ds._make_auto_tags()
move_dir(local_ds.store.tmp_dir, local_ds.store.snapshot_workdir)

def _when_cloud_exit() -> None:
from starwhale.core.dataset.copy import DatasetCopy

dc = DatasetCopy(
str(local_uri), str(self.uri), URIType.DATASET
).with_disable_datastore()
dc._do_upload_bundle_dir(workdir=local_ds.store.tmp_dir)
empty_dir(local_ds.store.tmp_dir)

def _when_exit() -> None:
local_ds.store.building = False
if isinstance(self.__core_dataset, StandaloneDataset):
_when_standalone_exit()
else:
_when_cloud_exit()

with ExitStack() as stack:
stack.callback(_when_exit)
local_ds.store.building = True
local_ds._manifest["dataset_summary"] = self._summary.asdict()
local_ds._calculate_signature()
local_ds._render_manifest()
local_ds._make_swds_meta_tar()

@_check_readonly
@_forbid_icode_build
def _do_build_from_handler(self) -> None:
# TODO: support build dataset for cloud uri directly
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError("no support to build cloud dataset directly")

self._trigger_icode_build = True
config = DatasetConfig(
name=self.name,
Expand All @@ -620,10 +686,6 @@ def _do_build_from_handler(self) -> None:

@_check_readonly
def build(self) -> None:
# TODO: support build dataset for cloud uri directly
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError("no support to build cloud dataset directly")

if self._trigger_icode_build:
self._do_build_from_interactive_code()
elif self._trigger_handler_build and self.build_handler:
Expand Down
4 changes: 2 additions & 2 deletions client/starwhale/api/_impl/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def flush(self, table_name: str) -> None:

class Dataset(Logger):
def __init__(
self, dataset_id: str, project: str, instance_uri: str = "", token: str = ""
self, dataset_id: str, project: str, instance_name: str = "", token: str = ""
) -> None:
if not dataset_id:
raise RuntimeError("id should not be None")
Expand All @@ -172,7 +172,7 @@ def __init__(
self.dataset_id = dataset_id
self.project = project
self._meta_table_name = f"project/{self.project}/dataset/{self.dataset_id}/meta"
self._data_store = data_store.get_data_store(instance_uri, token)
self._data_store = data_store.get_data_store(instance_name, token)
self._init_writers([self._meta_table_name])

def put(self, data_id: Union[str, int], **kwargs: Any) -> None:
Expand Down
Loading

0 comments on commit c107940

Please sign in to comment.