Skip to content

Commit

Permalink
tune datastore storage dir by swcli config
Browse files Browse the repository at this point in the history
  • Loading branch information
tianweidut authored and xuchuan committed Aug 11, 2022
1 parent d418acc commit 8553c14
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 41 deletions.
50 changes: 30 additions & 20 deletions client/starwhale/api/_impl/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import pyarrow.parquet as pq # type: ignore
from typing_extensions import Protocol

from starwhale.utils.fs import ensure_dir
from starwhale.utils.config import SWCliConfigMixed


class Type:
def __init__(
Expand Down Expand Up @@ -286,12 +289,12 @@ def nextItem(self) -> None:
self.item = None
self.key = ""

n = len(iters)
nodes = []
for i in range(n):
node = Node(i, iters[i])
if not node.exhausted:
nodes.append(node)
for _i, _iter in enumerate(iters):
_node = Node(_i, _iter)
if not _node.exhausted:
nodes.append(_node)

while len(nodes) > 0:
key = min(nodes, key=lambda x: x.key).key
d: Dict[str, Any] = {}
Expand All @@ -315,6 +318,9 @@ def nextItem(self) -> None:
def _get_table_files(path: str) -> List[str]:
if not os.path.exists(path):
return []
if not os.path.isdir(path):
raise RuntimeError(f"{path} is not a directory")

patches = []
base_index = -1
for file in os.listdir(path):
Expand All @@ -339,15 +345,19 @@ def _read_table_schema(path: str) -> TableSchema:
raise RuntimeError(f"path not found: {path}")
if not os.path.isdir(path):
raise RuntimeError(f"{path} is not a directory")

files = _get_table_files(path)
if len(files) == 0:
raise RuntimeError(f"table is empty, path:{path}")

schema = pq.read_schema(files[-1])
if schema.metadata is None:
raise RuntimeError(f"no metadata for file {files[-1]}")

schema_data = schema.metadata.get(b"schema", None)
if schema_data is None:
raise RuntimeError(f"no schema for file {files[-1]}")

return TableSchema.parse(schema_data.decode())


Expand All @@ -361,6 +371,7 @@ def _scan_table(
iters = []
for file in _get_table_files(path):
iters.append(_scan_parquet_file(file, columns, start, end))
column_names = []
if len(iters) > 0:
schema = _read_table_schema(path)
column_names = [
Expand Down Expand Up @@ -413,7 +424,7 @@ def _records_to_table(

def _get_size(d: Any) -> int:
ret = sys.getsizeof(d)
if type(d) is dict:
if isinstance(d, dict):
for v in d.values():
ret += sys.getsizeof(v)
return ret
Expand Down Expand Up @@ -525,10 +536,8 @@ def delete(self, keys: List[Any]) -> None:

def dump(self, root_path: str) -> None:
path = _get_table_path(root_path, self.table_name)
if not os.path.exists(path):
os.mkdir(path)
if not os.path.isdir(path):
raise RuntimeError(f"{path} is not a directory")
ensure_dir(path)

max_index = -1
for file in os.listdir(path):
type, index = _parse_parquet_name(file)
Expand Down Expand Up @@ -556,12 +565,11 @@ class LocalDataStore:
def get_instance() -> "LocalDataStore":
with LocalDataStore._lock:
if LocalDataStore._instance is None:
root_path = os.getenv("SW_ROOT_PATH", None)
if root_path is None:
raise RuntimeError(
"data store root path is not defined for standalone instance"
)
LocalDataStore._instance = LocalDataStore(root_path)

ds_path = SWCliConfigMixed().datastore_dir
ensure_dir(ds_path)

LocalDataStore._instance = LocalDataStore(str(ds_path))
atexit.register(LocalDataStore._instance.dump)
return LocalDataStore._instance

Expand Down Expand Up @@ -630,7 +638,7 @@ def __init__(
self,
name: str,
key_column_type: pa.DataType,
columns: Dict[str, str],
columns: Optional[Dict[str, str]],
explicit_none: bool,
) -> None:
self.name = name
Expand All @@ -648,6 +656,8 @@ def __init__(
key_column_type = schema.columns[schema.key_column].type.pa_type
column_names = schema.columns.keys()
col_prefix = table_alias + "."

cols: Optional[Dict[str, str]]
if columns is None or col_prefix + "*" in columns:
cols = None
else:
Expand All @@ -659,15 +669,15 @@ def __init__(
cols[name] = alias
infos.append(TableInfo(table_name, key_column_type, cols, explicit_none))

# check for key type conflication
# check for key type conflictions
for info in infos:
if info is infos[0]:
continue
if info.key_column_type != infos[0].key_column_type:
raise RuntimeError(
"conflicting key field type. "
+ f"{info.name} has a key of type {info.key_column_type},"
+ f" while {infos[0].name} has a key of type {infos[0].key_column_type}"
f"{info.name} has a key of type {info.key_column_type},"
f" while {infos[0].name} has a key of type {infos[0].key_column_type}"
)
iters = []
for info in infos:
Expand Down
16 changes: 2 additions & 14 deletions client/starwhale/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,8 @@ def rootdir(self) -> Path:
return Path(self._config["storage"]["root"])

@property
def workdir(self) -> Path:
return self.rootdir / "workdir"

@property
def pkgdir(self) -> Path:
return self.rootdir / "pkg"

@property
def dataset_dir(self) -> Path:
return self.rootdir / "dataset"

@property
def eval_run_dir(self) -> Path:
return self.rootdir / "run" / "eval"
def datastore_dir(self) -> Path:
return self.rootdir / ".datastore"

@property
def sw_remote_addr(self) -> str:
Expand Down
21 changes: 16 additions & 5 deletions client/tests/sdk/test_base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
import os
import shutil
import tempfile
import unittest

from starwhale.utils import config as sw_config
from starwhale.consts import ENV_SW_CLI_CONFIG, ENV_SW_LOCAL_STORAGE
from starwhale.utils.fs import empty_dir, ensure_dir


class BaseTestCase(unittest.TestCase):
def setUp(self) -> None:
self.root = os.path.join(tempfile.gettempdir(), "datastore_test")
os.makedirs(self.root, exist_ok=True)
os.environ["SW_ROOT_PATH"] = self.root
self._test_local_storage = tempfile.mkdtemp(prefix="sw-test-mock-")
os.environ[ENV_SW_CLI_CONFIG] = os.path.join(
self._test_local_storage, "config.yaml"
)
os.environ[ENV_SW_LOCAL_STORAGE] = self._test_local_storage
sw_config._config = {}

self.root = str(sw_config.SWCliConfigMixed().datastore_dir)
ensure_dir(self.root)

def tearDown(self) -> None:
shutil.rmtree(self.root)
empty_dir(self._test_local_storage)
os.environ.pop(ENV_SW_CLI_CONFIG, "")
os.environ.pop(ENV_SW_LOCAL_STORAGE, "")
6 changes: 5 additions & 1 deletion client/tests/sdk/test_data_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import unittest
from typing import Dict, List
from unittest.mock import patch, MagicMock

import numpy as np
import pyarrow as pa # type: ignore
Expand Down Expand Up @@ -940,13 +941,16 @@ def test_data_store_scan(self) -> None:

class TestTableWriter(BaseTestCase):
def setUp(self) -> None:
self.mock_atexit = patch("starwhale.api._impl.data_store.atexit", MagicMock())
self.mock_atexit.start()

super().setUp()
os.environ["SW_ROOT_PATH"] = self.root
self.writer = data_store.TableWriter("p/test", "k")

def tearDown(self) -> None:
self.writer.close()
super().tearDown()
self.mock_atexit.stop()

def test_insert_and_delete(self) -> None:
with self.assertRaises(RuntimeError, msg="no key"):
Expand Down
2 changes: 1 addition & 1 deletion client/tests/sdk/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .test_base import BaseTestCase


class TestEvaluaiton(BaseTestCase):
class TestEvaluation(BaseTestCase):
def setUp(self) -> None:
super().setUp()
os.environ["SW_PROJECT"] = "test"
Expand Down

0 comments on commit 8553c14

Please sign in to comment.