Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(client): fix dataset copy block forever when update table meets exception #3078

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions client/starwhale/api/_impl/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2857,7 +2857,6 @@ def close(self) -> None:
def _raise_run_exceptions(self, limits: int) -> None:
if len(self._queue_run_exceptions) > limits:
_es = self._queue_run_exceptions
self._queue_run_exceptions = []
raise TableWriterException(f"{self} run raise {len(_es)} exceptions: {_es}")

def insert(self, record: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -2903,7 +2902,9 @@ def flush(self) -> str:
"""
while True:
with self._cond:
if len(self._records) == 0 and len(self._updating_records) == 0:
if len(self._queue_run_exceptions) > self._run_exceptions_limits or (
len(self._records) == 0 and len(self._updating_records) == 0
):
break
time.sleep(0.1)
return self.latest_revision
Expand Down Expand Up @@ -2953,7 +2954,7 @@ def run(self) -> None:
if len(to_submit) > 0 and last_schema is not None:
self._batch_update_table(last_schema, to_submit)
except Exception as e:
console.print_exception()
# TODO: we should refine run exceptions limits, it's not equal to the wrong insertions
self._queue_run_exceptions.append(e)
if len(self._queue_run_exceptions) > self._run_exceptions_limits:
break
Expand Down
1 change: 0 additions & 1 deletion client/starwhale/core/dataset/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def _do_row_update(
dest.put(row)
progress.update(task_id, advance=1, refresh=True)
except Exception as e:
console.print_exception()
exceptions.append(e)
raise

Expand Down
7 changes: 5 additions & 2 deletions client/tests/sdk/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2345,8 +2345,11 @@ def test_run_thread_exception_limit(self, request_mock: Mocker) -> None:
while True:
if not remote_writer.is_alive():
break

if time.time() - start > 30:
is_timeout = True
break
time.sleep(0.1)

assert not is_timeout
assert not remote_writer.is_alive()
Expand All @@ -2359,8 +2362,8 @@ def test_run_thread_exception_limit(self, request_mock: Mocker) -> None:
with self.assertRaises(data_store.TableWriterException):
remote_writer.insert({"k": 2, "a": "2"})

assert len(remote_writer._queue_run_exceptions) == 0
remote_writer.close()
with self.assertRaises(data_store.TableWriterException):
remote_writer.close()

@Mocker()
def test_run_thread_exception(self, request_mock: Mocker) -> None:
Expand Down
1 change: 0 additions & 1 deletion client/tests/sdk/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def test_exception_close(self, request_mock: Mocker, m_conf: MagicMock) -> None:
assert _writer is not None
assert not _writer.is_alive()
assert _writer._stopped
assert len(_writer._queue_run_exceptions) == 0

def test_get_tables_from_standalone(self) -> None:
e = wrapper.Evaluation("tt", Project("test"))
Expand Down
Loading