Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][usability] Disambiguate ObjectLostErrors for better understandability #18292

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/src/ray/runtime/object/native_object_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ void NativeObjectStore::CheckException(const std::string &meta_str,
throw RayWorkerException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::ACTOR_DIED)) {
throw RayActorException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE)) {
} else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE) ||
meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_LOST) ||
meta_str == std::to_string(ray::rpc::ErrorType::OWNER_DIED) ||
meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_DELETED)) {
// TODO: Differentiate object errors.
throw UnreconstructableException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION)) {
throw RayTaskException(std::move(data_str));
Expand Down
55 changes: 55 additions & 0 deletions doc/source/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,61 @@ as well as some known problems. If you encounter other problems, please

.. _`let us know`: https://github.com/ray-project/ray/issues

Understanding `ObjectLostErrors`
--------------------------------
Ray throws an ``ObjectLostError`` to the application when an object cannot be
retrieved due to application or system error. This can occur during a
``ray.get()`` call or when fetching a task's arguments, and can happen for a
number of reasons. Here is a guide to understanding the root cause for
different error types:

- ``ObjectLostError``: The object was successfully created, but then all copies
were lost due to node failure.
- ``OwnerDiedError``: The owner of an object, i.e., the Python worker that
first created the ``ObjectRef`` via ``.remote()`` or ``ray.put()``, has died.
The owner stores critical object metadata and an object cannot be retrieved
if this process is lost.
- ``ObjectReconstructionFailedError``: Should only be thrown when `lineage
reconstruction`_ is enabled. This error is thrown if an object, or another
object that this object depends on, cannot be reconstructed because the
maximum number of task retries has been exceeded. By default, a non-actor
task can be retried up to 3 times and an actor task cannot be retried.
This can be overridden with the ``max_retries`` parameter for remote
functions and the ``max_task_retries`` parameter for actors.
- ``ReferenceCountingAssertionFailure``: The object has already been deleted,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we link to a github issue here? I don't think we should document known bugs in the docs.

so it cannot be retrieved. Ray implements automatic memory management through
distributed reference counting, so this error should not happen in general.
However, there is a known edge case that can occur when a worker passes an
``ObjectRef``, then exits before the ref count at the ``ObjectRef``'s owner
can be updated. Here is an example:

.. code-block:: python
@ray.remote
class RefPasser:
def pass_ref(self, ref_list, receiver):
ray.get(receiver.receive_ref.remote(ref_list))
# Simulate an exit.
sys.exit(-1)

@ray.remote
class Receiver:
def receive_ref(self, ref_list):
self.ref = ref_list[0]

def resolve_ref(self):
# Throws a ReferenceCountingAssertionFailure.
ray.get(self.ref)

passer = RefPasser.remote()
receiver = Receiver.remote()
ref = foo.remote()
passer.pass_ref.remote([ref], receiver)
# Receiver cannot retrieve the object because the RefPasser did not update
# the driver's ref count in time before exiting.
ray.get(receiver.resolve_ref.remote())

.. _`lineage reconstruction`: https://docs.ray.io/en/master/fault-tolerance.html

Crashes
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class ObjectSerializer {
String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META =
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();
private static final byte[] OBJECT_LOST_META =
String.valueOf(ErrorType.OBJECT_LOST.getNumber()).getBytes();
private static final byte[] OWNER_DIED_META =
String.valueOf(ErrorType.OWNER_DIED.getNumber()).getBytes();
private static final byte[] OBJECT_DELETED_META =
String.valueOf(ErrorType.OBJECT_DELETED.getNumber()).getBytes();
private static final byte[] TASK_EXECUTION_EXCEPTION_META =
String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();

Expand Down Expand Up @@ -86,7 +92,11 @@ public static Object deserialize(
}
}
return new RayActorException(actorId);
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0) {
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0
|| Bytes.indexOf(meta, OBJECT_LOST_META) == 0
|| Bytes.indexOf(meta, OWNER_DIED_META) == 0
|| Bytes.indexOf(meta, OBJECT_DELETED_META) == 0) {
// TODO: Differentiate object errors.
return new UnreconstructableException(objectId);
} else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) {
return deserializeRayException(data, objectId);
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ cdef class BaseID:
cdef class ObjectRef(BaseID):
cdef:
CObjectID data
c_string owner_addr
# Flag indicating whether or not this object ref was added to the set
# of active IDs in the core worker so we know whether we should clean
# it up.
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ cdef RayObjectsToDataMetadataPairs(
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs):
result = []
for i in range(object_refs.size()):
result.append(ObjectRef(object_refs[i].object_id(),
object_refs[i].call_site()))
result.append(ObjectRef(
object_refs[i].object_id(),
object_refs[i].owner_address().SerializeAsString(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this add extra overheads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call takes about 1-2us, total .remote() time is ~200us. I think this is okay, but I can run microbenchmarks if necessary.

object_refs[i].call_site()))
return result


Expand Down
99 changes: 91 additions & 8 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import ray.cloudpickle as pickle
from ray.core.generated.common_pb2 import RayException, Language, PYTHON
from ray.core.generated.common_pb2 import Address
import ray.ray_constants as ray_constants
from ray._raylet import WorkerID
import colorama
import setproctitle

Expand Down Expand Up @@ -171,7 +173,7 @@ def __str__(self):
# due to the dependency failure.
# Print out an user-friendly
# message to explain that..
out.append(" Some of the input arguments for "
out.append(" At least one of the input arguments for "
"this task could not be computed:")
if i + 1 < len(lines) and lines[i + 1].startswith(" "):
# If the next line is indented with 2 space,
Expand Down Expand Up @@ -279,32 +281,110 @@ def __str__(self):
"to list active objects in the cluster.")


class ObjectLostError(RayError):
"""Indicates that an object has been lost due to node failure.
# TODO(XXX): Replace with ObjectLostError for backwards compatibility once all
# Python tests pass.
class ObjectUnreachableError(RayError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ObjectUnreachableError(RayError):
class ObjectLostError(RayError):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch this back later, just putting this here for now so that I can see which Python tests would fail right now.

"""Generic error for objects that are unreachable due to node failure or
system error.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __init__(self, object_ref_hex, call_site):
def __init__(self, object_ref_hex, owner_address, call_site):
self.object_ref_hex = object_ref_hex
self.owner_address = owner_address
self.call_site = call_site.replace(
ray_constants.CALL_STACK_LINE_DELIMITER, "\n ")

def __str__(self):
msg = (f"Object {self.object_ref_hex} cannot be retrieved due to node "
"failure or system error.")
msg = f"Failed to retrieve object {self.object_ref_hex}. "
if self.call_site:
msg += (f" The ObjectRef was created at: {self.call_site}")
msg += (f"The ObjectRef was created at: {self.call_site}")
else:
msg += (
" To see information about where this ObjectRef was created "
"To see information about where this ObjectRef was created "
"in Python, set the environment variable "
"RAY_record_ref_creation_sites=1 during `ray start` and "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to set this for driver. Why don't we write a doc and make a link instead? (I think it is confusing for users anyway when they just read it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a doc, but left it out of the error message for now. I think it's best if we try to make the error messages standalone.

"`ray.init()`.")
return msg


class ObjectLostError(ObjectUnreachableError):
"""Indicates that the object is lost from distributed memory, due to
node failure or system error.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
return super().__str__() + "\n\n" + (
f"All copies of {self.object_ref_hex} have been lost due to node "
"failure. Check cluster logs (`/tmp/ray/session_latest/logs`) for "
"more information about the failure.")


class ObjectDeletedError(ObjectUnreachableError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to assertion failure class?

"""Indicates that an object has been deleted while there was still a
reference to it.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
return super().__str__() + "\n\n" + (
"The object has already been deleted by the reference counting "
"protocol. This should not happen.")


class OwnerDiedError(ObjectUnreachableError):
"""Indicates that the owner of the object has died while there is still a
reference to the object.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
log_loc = "`/tmp/ray/session_latest/logs`"
if self.owner_address:
try:
addr = Address()
addr.ParseFromString(self.owner_address)
ip_addr = addr.ip_address
worker_id = WorkerID(addr.worker_id)
log_loc = (
f"`/tmp/ray/session_latest/logs/*{worker_id.hex()}*`"
f" at IP address {ip_addr}")
except Exception:
# Catch all to make sure we always at least print the default
# message.
pass

return super().__str__() + "\n\n" + (
"The object's owner has exited. This is the Python "
"worker that first created the ObjectRef via `.remote()` or "
"`ray.put()`. "
f"Check cluster logs ({log_loc}) for more "
"information about the Python worker failure.")


class ObjectReconstructionFailedError(ObjectUnreachableError):
"""Indicates that the owner of the object has died while there is still a
reference to the object.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
return super().__str__() + "\n\n" + (
f"The object cannot be reconstructed "
"because the maximum number of task retries has been exceeded.")


class GetTimeoutError(RayError):
"""Indicates that a call to the worker timed out."""
pass
Expand Down Expand Up @@ -338,6 +418,9 @@ def __str__(self):
RayActorError,
ObjectStoreFullError,
ObjectLostError,
ObjectDeletedError,
ObjectReconstructionFailedError,
OwnerDiedError,
GetTimeoutError,
AsyncioActorExit,
RuntimeEnvSetupError,
Expand Down
6 changes: 5 additions & 1 deletion python/ray/includes/object_ref.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def _set_future_helper(

cdef class ObjectRef(BaseID):

def __init__(self, id, call_site_data=""):
def __init__(self, id, owner_addr="", call_site_data=""):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
self.owner_addr = owner_addr
self.in_core_worker = False
self.call_site_data = call_site_data

Expand Down Expand Up @@ -85,6 +86,9 @@ cdef class ObjectRef(BaseID):
def job_id(self):
return self.task_id().job_id()

def owner_address(self):
return self.owner_addr

def call_site(self):
return decode(self.call_site_data)

Expand Down
31 changes: 22 additions & 9 deletions python/ray/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from ray import ray_constants
import ray._private.utils
from ray._private.gcs_utils import ErrorType
from ray.exceptions import (RayError, PlasmaObjectNotAvailable, RayTaskError,
RayActorError, TaskCancelledError,
WorkerCrashedError, ObjectLostError,
RaySystemError, RuntimeEnvSetupError)
from ray.exceptions import (
RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError,
TaskCancelledError, WorkerCrashedError, ObjectLostError,
ObjectDeletedError, OwnerDiedError, ObjectReconstructionFailedError,
RaySystemError, RuntimeEnvSetupError)
from ray._raylet import (
split_buffer,
unpack_pickle5_buffers,
Expand Down Expand Up @@ -37,7 +38,7 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status):
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectRef is greater than 0 by the
# time the core worker resolves the value of the object.
obj_ref = ray.ObjectRef(binary, call_site)
obj_ref = ray.ObjectRef(binary, owner_address, call_site)

# TODO(edoakes): we should be able to just capture a reference
# to 'self' here instead, but this function is itself pickled
Expand Down Expand Up @@ -222,15 +223,27 @@ def _deserialize_object(self, data, metadata, object_ref):
return RayActorError()
elif error_type == ErrorType.Value("TASK_CANCELLED"):
return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
elif error_type == ErrorType.Value("OBJECT_LOST"):
return ObjectLostError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OBJECT_DELETED"):
return ObjectDeletedError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OWNER_DIED"):
return OwnerDiedError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectReconstructionFailedError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError()
else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
"Tried to get object that has been promoted to plasma."
assert False, "Unrecognized error type " + str(error_type)
return RaySystemError("Unrecognized error type " +
str(error_type))
elif data:
raise ValueError("non-null object should always have metadata")
else:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import ray
from ray.exceptions import TaskCancelledError, RayTaskError, \
GetTimeoutError, WorkerCrashedError, \
ObjectLostError
ObjectUnreachableError
from ray._private.test_utils import SignalActor


def valid_exceptions(use_force):
if use_force:
return (RayTaskError, TaskCancelledError, WorkerCrashedError,
ObjectLostError)
ObjectUnreachableError)
else:
return (RayTaskError, TaskCancelledError)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_client_terminate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from ray.exceptions import TaskCancelledError
from ray.exceptions import RayTaskError
from ray.exceptions import WorkerCrashedError
from ray.exceptions import ObjectLostError
from ray.exceptions import ObjectUnreachableError
from ray.exceptions import GetTimeoutError


def valid_exceptions(use_force):
if use_force:
return (RayTaskError, TaskCancelledError, WorkerCrashedError,
ObjectLostError)
ObjectUnreachableError)
else:
return (RayTaskError, TaskCancelledError)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_failure_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def sleep_to_kill_raylet():

thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start()
with pytest.raises(ray.exceptions.ObjectLostError):
with pytest.raises(ray.exceptions.ObjectDeletedError):
ray.get(object_ref)
thread.join()

Expand All @@ -307,7 +307,7 @@ def large_object():
# Evict the object.
ray.internal.free([obj])
# ray.get throws an exception.
with pytest.raises(ray.exceptions.ObjectLostError):
with pytest.raises(ray.exceptions.ObjectDeletedError):
ray.get(obj)

@ray.remote
Expand Down
Loading