Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): support cloud remote dataset build by dataset sdk #1606

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions client/starwhale/api/_impl/dataset/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@
import jsonlines
from loguru import logger

from starwhale.consts import AUTH_ENV_FNAME, DEFAULT_PROJECT, SWDS_DATA_FNAME_FMT
from starwhale.consts import (
AUTH_ENV_FNAME,
DEFAULT_PROJECT,
STANDALONE_INSTANCE,
SWDS_DATA_FNAME_FMT,
)
from starwhale.base.uri import URI
from starwhale.utils.fs import empty_dir, ensure_dir
from starwhale.base.type import DataFormatType, DataOriginType, ObjectStoreType
from starwhale.base.type import (
InstanceType,
DataFormatType,
DataOriginType,
ObjectStoreType,
)
from starwhale.utils.error import FormatError, NoSupportError
from starwhale.core.dataset import model
from starwhale.core.dataset.type import (
Expand Down Expand Up @@ -61,6 +71,7 @@ def __init__(
append_from_version: str = "",
append_from_uri: t.Optional[URI] = None,
data_mime_type: MIMEType = MIMEType.UNDEFINED,
instance_name: str = STANDALONE_INSTANCE,
) -> None:
# TODO: add more docstring for args
# TODO: validate group upper and lower?
Expand All @@ -80,11 +91,20 @@ def __init__(
self.dataset_name = dataset_name
self.dataset_version = dataset_version
self.tabular_dataset = TabularDataset(
dataset_name, dataset_version, project_name
dataset_name,
dataset_version,
project_name,
instance_name=instance_name,
)

self._forked_summary: t.Optional[DatasetSummary]
if append and append_from_uri:
# TODO: controller supports cloud dataset fork api
if append_from_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build dataset from existed cloud dataset: {append_from_uri}"
)

self._forked_last_seq_id, self._forked_rows = self.tabular_dataset.fork(
append_from_version
)
Expand Down Expand Up @@ -546,6 +566,7 @@ def __init__(
append_from_version: str = "",
append_from_uri: t.Optional[URI] = None,
append_with_swds_bin: bool = True,
instance_name: str = STANDALONE_INSTANCE,
) -> None:
super().__init__(
name=f"RowWriter-{dataset_name}-{dataset_version}-{project_name}"
Expand All @@ -561,6 +582,7 @@ def __init__(
"append": append,
"append_from_version": append_from_version,
"append_from_uri": append_from_uri,
"instance_name": instance_name,
}

self._queue: queue.Queue[t.Optional[DataRow]] = queue.Queue()
Expand Down
140 changes: 101 additions & 39 deletions client/starwhale/api/_impl/dataset/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
from types import TracebackType
from pathlib import Path
from functools import wraps
from contextlib import ExitStack

from loguru import logger

from starwhale.utils import gen_uniq_version
from starwhale.consts import HTTPMethod, DEFAULT_PAGE_IDX, DEFAULT_PAGE_SIZE
from starwhale.consts import (
HTTPMethod,
DEFAULT_PAGE_IDX,
DEFAULT_PAGE_SIZE,
STANDALONE_INSTANCE,
)
from starwhale.base.uri import URI, URIType
from starwhale.utils.fs import move_dir, empty_dir
from starwhale.base.type import InstanceType
from starwhale.base.cloud import CloudRequestMixed
from starwhale.utils.error import ExistedError, NotFoundError, NoSupportError
Expand Down Expand Up @@ -102,8 +109,15 @@ def __init__(

self._append_use_swds_bin = False
_summary = None
if self._check_uri_exists(_origin_uri):
origin_uri_exists = self._check_uri_exists(_origin_uri)
if origin_uri_exists:
if create:
# TODO: support build cloud dataset from the existed dataset
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError(
f"Can't build dataset from the existed cloud dataset uri:{_origin_uri}"
)

self._append_from_version = version
self._create_by_append = True
self._fork_dataset()
Expand All @@ -122,7 +136,14 @@ def __init__(
else:
raise ExistedError(f"{self.uri} was not found fo load")

self._summary = _summary or self.__core_dataset.summary()
self._summary: t.Optional[DatasetSummary]
if _summary:
self._summary = _summary
else:
if origin_uri_exists:
self._summary = self.__core_dataset.summary()
else:
self._summary = DatasetSummary()

self._rows_cnt = self._summary.rows if self._summary else 0
self._consumption: t.Optional[TabularDatasetSessionConsumption] = None
Expand All @@ -139,10 +160,21 @@ def __init__(
def _fork_dataset(self) -> None:
# TODO: support cloud dataset prepare in the tmp dir
# TODO: lazy fork dataset
self.__core_dataset._prepare_snapshot()
self.__core_dataset._fork_swds(
self._create_by_append, self._append_from_version
)
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"only support standalone dataset fork: {self.__core_dataset}"
)

def _when_exit() -> None:
self.__core_dataset.store.building = False

with ExitStack() as stack:
stack.callback(_when_exit)
self.__core_dataset.store.building = True
self.__core_dataset._prepare_snapshot()
self.__core_dataset._fork_swds(
self._create_by_append, self._append_from_version
)

def _auto_complete_version(self, version: str) -> str:
version = version.strip()
Expand Down Expand Up @@ -205,6 +237,10 @@ def make_distributed_consumption(
)
return self

def _clear_data_loader(self) -> None:
with self._lock:
self.__data_loaders = {}

def _get_data_loader(
self, recreate: bool = False, disable_consumption: bool = False
) -> DataLoader:
Expand Down Expand Up @@ -272,6 +308,7 @@ def _run() -> _GItemType:
return _run()
except RuntimeError as e:
if str(e).startswith("table is empty"):
self._clear_data_loader()
return None
raise
except StopIteration:
Expand Down Expand Up @@ -340,7 +377,7 @@ def _check_uri_exists(uri: t.Optional[URI]) -> bool:
if uri.instance_type == InstanceType.CLOUD:
crm = CloudRequestMixed()
ok, _ = crm.do_http_request_simple_ret(
path=f"/project/{uri.project}/{URIType.DATASET}/{uri.object.name}/version/{uri.object.version}/file",
path=f"/project/{uri.project}/{URIType.DATASET}/{uri.object.name}/version/{uri.object.version}",
method=HTTPMethod.HEAD,
instance_uri=uri,
ignore_status_codes=[HTTPStatus.NOT_FOUND],
Expand Down Expand Up @@ -431,11 +468,6 @@ def __setitem__(
) -> None:
# TODO: tune the performance of getitem by cache
self._trigger_icode_build = True
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"setitem only supports for standalone dataset: {self.__core_dataset}"
)

_row_writer = self._get_row_writer()

if not isinstance(key, (int, str)):
Expand Down Expand Up @@ -487,20 +519,16 @@ def _get_row_writer(self) -> RowWriter:
append_from_version = ""

# TODO: support alignment_bytes_size, volume_bytes_size arguments
if not isinstance(self.__core_dataset, StandaloneDataset):
raise NoSupportError(
f"setitem only supports for standalone dataset: {self.__core_dataset}"
)

self._row_writer = RowWriter(
dataset_name=self.name,
dataset_version=self.version,
project_name=self.project_uri.project,
workdir=self.__core_dataset.store.snapshot_workdir, # TODO: use tmpdir which is same as dataset build command
workdir=self.__core_dataset.store.tmp_dir,
append=self._create_by_append,
append_from_version=append_from_version,
append_from_uri=append_from_uri,
append_with_swds_bin=self._append_use_swds_bin,
instance_name=self.project_uri.instance,
)
return self._row_writer

Expand Down Expand Up @@ -573,28 +601,66 @@ def build_with_copy_src(
@_check_readonly
@_forbid_handler_build
def _do_build_from_interactive_code(self) -> None:
ds = self.__core_dataset
if isinstance(ds, StandaloneDataset):
if self._row_writer is None:
raise RuntimeError("row writer is none, no data was written")
if self._row_writer is None:
raise RuntimeError("row writer is none, no data was written")

self.flush()
self._row_writer.close()
# TODO: use the elegant method to refactor manifest update
self._summary = self._row_writer.summary
self._summary.rows = len(self)
ds._manifest["dataset_summary"] = self._summary.asdict()
ds._calculate_signature()
ds._render_manifest()
ds._make_swds_meta_tar()
ds._make_auto_tags()
self.flush()
self._row_writer.close()
self._summary = self._row_writer.summary

# TODO: use the elegant method to refactor manifest update
self._summary.rows = len(self)

if isinstance(self.__core_dataset, StandaloneDataset):
local_ds = self.__core_dataset
local_uri = self.uri
else:
# TODO: support cloud dataset build
raise NoSupportError("only support standalone dataset build")
local_uri = URI.capsulate_uri(
instance=STANDALONE_INSTANCE,
project=self.uri.project,
obj_type=self.uri.object.typ,
obj_name=self.uri.object.name,
obj_ver=self.uri.object.version,
)
local_ds = StandaloneDataset(local_uri)
local_ds.store._tmp_dir = self.__core_dataset.store.tmp_dir
setattr(local_ds, "_version", self.version)

def _when_standalone_exit() -> None:
local_ds._make_auto_tags()
move_dir(local_ds.store.tmp_dir, local_ds.store.snapshot_workdir)

def _when_cloud_exit() -> None:
from starwhale.core.dataset.copy import DatasetCopy

dc = DatasetCopy(
str(local_uri), str(self.uri), URIType.DATASET
).with_disable_datastore()
dc._do_upload_bundle_dir(workdir=local_ds.store.tmp_dir)
empty_dir(local_ds.store.tmp_dir)

def _when_exit() -> None:
local_ds.store.building = False
if isinstance(self.__core_dataset, StandaloneDataset):
_when_standalone_exit()
else:
_when_cloud_exit()

with ExitStack() as stack:
stack.callback(_when_exit)
local_ds.store.building = True
local_ds._manifest["dataset_summary"] = self._summary.asdict()
local_ds._calculate_signature()
local_ds._render_manifest()
local_ds._make_swds_meta_tar()

@_check_readonly
@_forbid_icode_build
def _do_build_from_handler(self) -> None:
# TODO: support build dataset for cloud uri directly
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError("no support to build cloud dataset directly")

self._trigger_icode_build = True
config = DatasetConfig(
name=self.name,
Expand All @@ -620,10 +686,6 @@ def _do_build_from_handler(self) -> None:

@_check_readonly
def build(self) -> None:
# TODO: support build dataset for cloud uri directly
if self.project_uri.instance_type == InstanceType.CLOUD:
raise NoSupportError("no support to build cloud dataset directly")

if self._trigger_icode_build:
self._do_build_from_interactive_code()
elif self._trigger_handler_build and self.build_handler:
Expand Down
4 changes: 2 additions & 2 deletions client/starwhale/api/_impl/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def flush(self, table_name: str) -> None:

class Dataset(Logger):
def __init__(
self, dataset_id: str, project: str, instance_uri: str = "", token: str = ""
self, dataset_id: str, project: str, instance_name: str = "", token: str = ""
) -> None:
if not dataset_id:
raise RuntimeError("id should not be None")
Expand All @@ -172,7 +172,7 @@ def __init__(
self.dataset_id = dataset_id
self.project = project
self._meta_table_name = f"project/{self.project}/dataset/{self.dataset_id}/meta"
self._data_store = data_store.get_data_store(instance_uri, token)
self._data_store = data_store.get_data_store(instance_name, token)
self._init_writers([self._meta_table_name])

def put(self, data_id: Union[str, int], **kwargs: Any) -> None:
Expand Down
Loading