diff --git a/client/starwhale/api/_impl/data_store.py b/client/starwhale/api/_impl/data_store.py index cb45626fe4..ee73edfb16 100644 --- a/client/starwhale/api/_impl/data_store.py +++ b/client/starwhale/api/_impl/data_store.py @@ -1477,20 +1477,24 @@ def __gt__(self, other: object) -> Any: raise NotImplementedError return self.min_key > other.max_key - def load(self) -> MemoryTable: + @contextlib.contextmanager + def _open_jsonlines(self) -> Iterator[jsonlines.Reader]: if self.file is None: raise RuntimeError("can not load cache for virtual block") if not self.file.exists(): raise RuntimeError(f"can not find file {self.file}") + compressor = get_compressor(self.file) with compressor.decompress(self.file) as file: with jsonlines.open(file) as reader: - # meta = reader.read() - # TODO validate the meta - table = MemoryTable(self.key_column) - for record in reader: - table.insert_record(InnerRecord.loads(record)) - return table + yield reader + + def load(self) -> MemoryTable: + with self._open_jsonlines() as reader: + table = MemoryTable(self.key_column) + for record in reader: + table.insert_record(InnerRecord.loads(record)) + return table def scan( self, @@ -1498,8 +1502,19 @@ def scan( end: Optional[Any] = None, end_inclusive: bool = False, ) -> Iterator[InnerRecord]: - cache = self.load() - yield from cache.scan(start, end, end_inclusive) + with self._open_jsonlines() as reader: + for record in reader: + # the record is sorted by key + row = InnerRecord.loads(record) + if start is not None and row.key < start: + continue + if end_inclusive: + if end is not None and row.key > end: + break + else: + if end is not None and row.key >= end: + break + yield row def dump( self,