Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workflow] Minor refactoring of workflow exceptions #26398

Merged
merged 3 commits into from
Jul 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions python/ray/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
from ray.workflow.common import (
WorkflowStatus,
Event,
WorkflowRunningError,
WorkflowNotFoundError,
asyncio_run,
validate_user_metadata,
)
from ray.workflow.exceptions import WorkflowRunningError, WorkflowNotFoundError
from ray.workflow import serialization
from ray.workflow.event_listener import EventListener, EventListenerType, TimerListener
from ray.workflow import workflow_access
Expand Down Expand Up @@ -459,7 +458,11 @@ def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any]
ValueError: if given workflow or workflow step does not exist.
"""
_ensure_workflow_initialized()
return execution.get_metadata(workflow_id, name)
store = get_workflow_storage(workflow_id)
if name is None:
return store.load_workflow_metadata()
else:
return store.load_step_metadata(name)


@PublicAPI(stability="beta")
Expand Down
17 changes: 0 additions & 17 deletions python/ray/workflow/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,3 @@ class WorkflowExecutionMetadata:
class WorkflowMetaData:
# The current status of the workflow
status: WorkflowStatus


@PublicAPI(stability="beta")
class WorkflowNotFoundError(Exception):
def __init__(self, workflow_id: str):
self.message = f"Workflow[id={workflow_id}] was referenced but doesn't exist."
super().__init__(self.message)


@PublicAPI(stability="beta")
class WorkflowRunningError(Exception):
def __init__(self, operation: str, workflow_id: str):
self.message = (
f"{operation} couldn't be completed becasue "
f"Workflow[id={workflow_id}] is still running."
)
super().__init__(self.message)
17 changes: 17 additions & 0 deletions python/ray/workflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,20 @@ class WorkflowNotResumableError(WorkflowError):
def __init__(self, workflow_id: str):
self.message = f"Workflow[id={workflow_id}] is not resumable."
super().__init__(self.message)


@PublicAPI(stability="beta")
class WorkflowNotFoundError(WorkflowError):
def __init__(self, workflow_id: str):
self.message = f"Workflow[id={workflow_id}] was referenced but doesn't exist."
super().__init__(self.message)


@PublicAPI(stability="beta")
class WorkflowRunningError(WorkflowError):
def __init__(self, operation: str, workflow_id: str):
self.message = (
f"{operation} couldn't be completed becasue "
f"Workflow[id={workflow_id}] is still running."
)
super().__init__(self.message)
13 changes: 1 addition & 12 deletions python/ray/workflow/execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import time
from typing import Set, List, Tuple, Optional, Dict, Any
from typing import Set, List, Tuple, Optional, Dict
import uuid

import ray
Expand Down Expand Up @@ -135,17 +135,6 @@ def get_status(workflow_id: str) -> Optional[WorkflowStatus]:
return ray.get(workflow_manager.get_workflow_status.remote(workflow_id))


def get_metadata(workflow_id: str, name: Optional[str]) -> Dict[str, Any]:
"""Get the metadata of the workflow.
See "api.get_metadata()" for details.
"""
store = workflow_storage.get_workflow_storage(workflow_id)
if name is None:
return store.load_workflow_metadata()
else:
return store.load_step_metadata(name)


def list_all(status_filter: Set[WorkflowStatus]) -> List[Tuple[str, WorkflowStatus]]:
try:
workflow_manager = get_management_actor()
Expand Down
2 changes: 1 addition & 1 deletion python/ray/workflow/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from ray.workflow import workflow_storage
from ray.workflow.common import (
StepType,
WorkflowNotFoundError,
WorkflowStepRuntimeOptions,
)
from ray.workflow.exceptions import WorkflowNotFoundError
from ray.workflow import serialization_context
from ray.workflow.tests import utils

Expand Down
4 changes: 3 additions & 1 deletion python/ray/workflow/tests/test_workflow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@


def test_workflow_manager_simple(workflow_start_regular):
from ray.workflow.exceptions import WorkflowNotFoundError

assert [] == workflow.list_all()
with pytest.raises(workflow.common.WorkflowNotFoundError):
with pytest.raises(WorkflowNotFoundError):
workflow.get_status("X")


Expand Down
3 changes: 2 additions & 1 deletion python/ray/workflow/workflow_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import ray

from ray.workflow import common
from ray.workflow.common import WorkflowStatus, TaskID, WorkflowNotFoundError
from ray.workflow.common import WorkflowStatus, TaskID
from ray.workflow import workflow_state_from_storage
from ray.workflow import workflow_context
from ray.workflow import workflow_storage
from ray.workflow.exceptions import (
WorkflowCancellationError,
WorkflowNotFoundError,
WorkflowNotResumableError,
)
from ray.workflow.workflow_executor import WorkflowExecutor
Expand Down
2 changes: 1 addition & 1 deletion python/ray/workflow/workflow_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from ray.workflow.common import (
TaskID,
WorkflowStatus,
WorkflowNotFoundError,
WorkflowStepRuntimeOptions,
)
from ray.workflow.exceptions import WorkflowNotFoundError
from ray.workflow import workflow_context
from ray.workflow import serialization
from ray.workflow import serialization_context
Expand Down