Skip to content

Commit

Permalink
chore(datastore): shorten first item response time (#2940)
Browse files Browse the repository at this point in the history
  • Loading branch information
jialeicui authored Nov 6, 2023
1 parent f8b0237 commit 1ccbbf1
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions client/starwhale/api/_impl/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,29 +1477,44 @@ def __gt__(self, other: object) -> Any:
raise NotImplementedError
return self.min_key > other.max_key

def load(self) -> MemoryTable:
@contextlib.contextmanager
def _open_jsonlines(self) -> Iterator[jsonlines.Reader]:
if self.file is None:
raise RuntimeError("can not load cache for virtual block")
if not self.file.exists():
raise RuntimeError(f"can not find file {self.file}")

compressor = get_compressor(self.file)
with compressor.decompress(self.file) as file:
with jsonlines.open(file) as reader:
# meta = reader.read()
# TODO validate the meta
table = MemoryTable(self.key_column)
for record in reader:
table.insert_record(InnerRecord.loads(record))
return table
yield reader

def load(self) -> MemoryTable:
with self._open_jsonlines() as reader:
table = MemoryTable(self.key_column)
for record in reader:
table.insert_record(InnerRecord.loads(record))
return table

def scan(
self,
start: Optional[Any] = None,
end: Optional[Any] = None,
end_inclusive: bool = False,
) -> Iterator[InnerRecord]:
cache = self.load()
yield from cache.scan(start, end, end_inclusive)
with self._open_jsonlines() as reader:
for record in reader:
# the record is sorted by key
row = InnerRecord.loads(record)
if start is not None and row.key < start:
continue
if end_inclusive:
if end is not None and row.key > end:
break
else:
if end is not None and row.key >= end:
break
yield row

def dump(
self,
Expand Down

0 comments on commit 1ccbbf1

Please sign in to comment.