Skip to content

During eviction, set is_replaying and raise special exception #524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,3 @@ jobs:
python-repo-path: ${{github.event.pull_request.head.repo.full_name}}
version: ${{github.event.pull_request.head.ref}}
version-is-repo-ref: true
features-repo-ref: http-connect-proxy-python
11 changes: 10 additions & 1 deletion temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ def _apply_remove_from_cache(
) -> None:
self._deleting = True
self._cancel_requested = True
# We consider eviction to be under replay so that certain code like
# logging that avoids replaying doesn't run during eviction either
self._is_replaying = True
# Cancel everything
for task in self._tasks:
task.cancel()
Expand Down Expand Up @@ -1514,7 +1517,9 @@ def _assert_not_read_only(
self, action_attempted: str, *, allow_during_delete: bool = False
) -> None:
if self._deleting and not allow_during_delete:
raise RuntimeError(f"Ignoring {action_attempted} while deleting")
raise _WorkflowBeingEvictedError(
f"Ignoring {action_attempted} while evicting workflow. This is not an error."
)
if self._read_only:
raise temporalio.workflow.ReadOnlyContextError(
f"While in read-only function, action attempted: {action_attempted}"
Expand Down Expand Up @@ -2614,3 +2619,7 @@ def set(
) -> None:
if not temporalio.workflow.unsafe.is_replaying():
self._underlying.set(value, additional_attributes)


class _WorkflowBeingEvictedError(BaseException):
pass
46 changes: 46 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3429,6 +3429,52 @@ async def signal_count() -> int:
assert not hook_calls


@dataclass
class CapturedEvictionException:
is_replaying: bool
exception: BaseException


captured_eviction_exceptions: List[CapturedEvictionException] = []


@workflow.defn(sandboxed=False)
class EvictionCaptureExceptionWorkflow:
@workflow.run
async def run(self) -> None:
# Going to sleep so we can force eviction
try:
await asyncio.sleep(0.01)
except BaseException as err:
captured_eviction_exceptions.append(
CapturedEvictionException(
is_replaying=workflow.unsafe.is_replaying(), exception=err
)
)


async def test_workflow_eviction_exception(client: Client):
assert not captured_eviction_exceptions

# Run workflow with no cache (forces eviction every step)
async with new_worker(
client, EvictionCaptureExceptionWorkflow, max_cached_workflows=0
) as worker:
await client.execute_workflow(
EvictionCaptureExceptionWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

# Confirm expected eviction replaying state and exception type
assert len(captured_eviction_exceptions) == 1
assert captured_eviction_exceptions[0].is_replaying
assert (
type(captured_eviction_exceptions[0].exception).__name__
== "_WorkflowBeingEvictedError"
)


@dataclass
class DynamicWorkflowValue:
some_string: str
Expand Down