Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][usability] Disambiguate ObjectLostErrors for better understandability #18292

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/src/ray/runtime/object/native_object_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ void NativeObjectStore::CheckException(const std::string &meta_str,
throw RayWorkerException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::ACTOR_DIED)) {
throw RayActorException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE)) {
} else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE) ||
meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_LOST) ||
meta_str == std::to_string(ray::rpc::ErrorType::OWNER_DIED) ||
meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_DELETED)) {
// TODO: Differentiate object errors.
throw UnreconstructableException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION)) {
throw RayTaskException(std::move(data_str));
Expand Down
29 changes: 29 additions & 0 deletions doc/source/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,35 @@ as well as some known problems. If you encounter other problems, please

.. _`let us know`: https://github.com/ray-project/ray/issues

Understanding `ObjectLostErrors`
--------------------------------
Ray throws an ``ObjectLostError`` to the application when an object cannot be
retrieved due to application or system error. This can occur during a
``ray.get()`` call or when fetching a task's arguments, and can happen for a
number of reasons. Here is a guide to understanding the root cause for
different error types:

- ``ObjectLostError``: The object was successfully created, but then all copies
were lost due to node failure.
- ``OwnerDiedError``: The owner of an object, i.e., the Python worker that
first created the ``ObjectRef`` via ``.remote()`` or ``ray.put()``, has died.
The owner stores critical object metadata and an object cannot be retrieved
if this process is lost.
- ``ObjectReconstructionFailedError``: Should only be thrown when `lineage
reconstruction`_ is enabled. This error is thrown if an object, or another
object that this object depends on, cannot be reconstructed because the
maximum number of task retries has been exceeded. By default, a non-actor
task can be retried up to 3 times and an actor task cannot be retried.
This can be overridden with the ``max_retries`` parameter for remote
functions and the ``max_task_retries`` parameter for actors.
- ``ReferenceCountingAssertionError``: The object has already been deleted,
so it cannot be retrieved. Ray implements automatic memory management through
distributed reference counting, so this error should not happen in general.
However, there is a `known edge case`_ that can produce this error.

.. _`lineage reconstruction`: https://docs.ray.io/en/master/fault-tolerance.html
.. _`known edge case`: https://github.com/ray-project/ray/issues/18456

Crashes
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class ObjectSerializer {
String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META =
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();
private static final byte[] OBJECT_LOST_META =
String.valueOf(ErrorType.OBJECT_LOST.getNumber()).getBytes();
private static final byte[] OWNER_DIED_META =
String.valueOf(ErrorType.OWNER_DIED.getNumber()).getBytes();
private static final byte[] OBJECT_DELETED_META =
String.valueOf(ErrorType.OBJECT_DELETED.getNumber()).getBytes();
private static final byte[] TASK_EXECUTION_EXCEPTION_META =
String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();

Expand Down Expand Up @@ -77,6 +83,12 @@ public static Object deserialize(
return Serializer.decode(data, objectType);
} else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) {
return new RayWorkerException();
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0
|| Bytes.indexOf(meta, OBJECT_LOST_META) == 0
|| Bytes.indexOf(meta, OWNER_DIED_META) == 0
|| Bytes.indexOf(meta, OBJECT_DELETED_META) == 0) {
// TODO: Differentiate object errors.
return new UnreconstructableException(objectId);
} else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) {
ActorId actorId = IdUtil.getActorIdFromObjectId(objectId);
if (data != null && data.length > 0) {
Expand All @@ -86,8 +98,6 @@ public static Object deserialize(
}
}
return new RayActorException(actorId);
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0) {
return new UnreconstructableException(objectId);
} else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) {
return deserializeRayException(data, objectId);
} else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0) {
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ cdef class BaseID:
cdef class ObjectRef(BaseID):
cdef:
CObjectID data
c_string owner_addr
# Flag indicating whether or not this object ref was added to the set
# of active IDs in the core worker so we know whether we should clean
# it up.
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ cdef RayObjectsToDataMetadataPairs(
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs):
result = []
for i in range(object_refs.size()):
result.append(ObjectRef(object_refs[i].object_id(),
object_refs[i].call_site()))
result.append(ObjectRef(
object_refs[i].object_id(),
object_refs[i].owner_address().SerializeAsString(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this add extra overheads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call takes about 1-2us, total .remote() time is ~200us. I think this is okay, but I can run microbenchmarks if necessary.

object_refs[i].call_site()))
return result


Expand Down
88 changes: 80 additions & 8 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import ray.cloudpickle as pickle
from ray.core.generated.common_pb2 import RayException, Language, PYTHON
from ray.core.generated.common_pb2 import Address
import ray.ray_constants as ray_constants
from ray._raylet import WorkerID
import colorama
import setproctitle

Expand Down Expand Up @@ -171,7 +173,7 @@ def __str__(self):
# due to the dependency failure.
# Print out an user-friendly
# message to explain that..
out.append(" Some of the input arguments for "
out.append(" At least one of the input arguments for "
"this task could not be computed:")
if i + 1 < len(lines) and lines[i + 1].startswith(" "):
# If the next line is indented with 2 space,
Expand Down Expand Up @@ -280,30 +282,97 @@ def __str__(self):


class ObjectLostError(RayError):
"""Indicates that an object has been lost due to node failure.
"""Indicates that the object is lost from distributed memory, due to
node failure or system error.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __init__(self, object_ref_hex, call_site):
def __init__(self, object_ref_hex, owner_address, call_site):
self.object_ref_hex = object_ref_hex
self.owner_address = owner_address
self.call_site = call_site.replace(
ray_constants.CALL_STACK_LINE_DELIMITER, "\n ")

def __str__(self):
msg = (f"Object {self.object_ref_hex} cannot be retrieved due to node "
"failure or system error.")
def _base_str(self):
msg = f"Failed to retrieve object {self.object_ref_hex}. "
if self.call_site:
msg += (f" The ObjectRef was created at: {self.call_site}")
msg += (f"The ObjectRef was created at: {self.call_site}")
else:
msg += (
" To see information about where this ObjectRef was created "
"To see information about where this ObjectRef was created "
"in Python, set the environment variable "
"RAY_record_ref_creation_sites=1 during `ray start` and "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to set this for driver. Why don't we write a doc and make a link instead? (I think it is confusing for users anyway when they just read it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a doc, but left it out of the error message for now. I think it's best if we try to make the error messages standalone.

"`ray.init()`.")
return msg

def __str__(self):
return self._base_str() + "\n\n" + (
f"All copies of {self.object_ref_hex} have been lost due to node "
"failure. Check cluster logs (`/tmp/ray/session_latest/logs`) for "
"more information about the failure.")


class ReferenceCountingAssertionError(ObjectLostError, AssertionError):
"""Indicates that an object has been deleted while there was still a
reference to it.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
return self._base_str() + "\n\n" + (
"The object has already been deleted by the reference counting "
"protocol. This should not happen.")


class OwnerDiedError(ObjectLostError):
"""Indicates that the owner of the object has died while there is still a
reference to the object.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
log_loc = "`/tmp/ray/session_latest/logs`"
if self.owner_address:
try:
addr = Address()
addr.ParseFromString(self.owner_address)
ip_addr = addr.ip_address
worker_id = WorkerID(addr.worker_id)
log_loc = (
f"`/tmp/ray/session_latest/logs/*{worker_id.hex()}*`"
f" at IP address {ip_addr}")
except Exception:
# Catch all to make sure we always at least print the default
# message.
pass

return self._base_str() + "\n\n" + (
"The object's owner has exited. This is the Python "
"worker that first created the ObjectRef via `.remote()` or "
"`ray.put()`. "
f"Check cluster logs ({log_loc}) for more "
"information about the Python worker failure.")


class ObjectReconstructionFailedError(ObjectLostError):
"""Indicates that the owner of the object has died while there is still a
reference to the object.

Attributes:
object_ref_hex: Hex ID of the object.
"""

def __str__(self):
return self._base_str() + "\n\n" + (
"The object cannot be reconstructed "
"because the maximum number of task retries has been exceeded.")


class GetTimeoutError(RayError):
"""Indicates that a call to the worker timed out."""
Expand Down Expand Up @@ -338,6 +407,9 @@ def __str__(self):
RayActorError,
ObjectStoreFullError,
ObjectLostError,
ReferenceCountingAssertionError,
ObjectReconstructionFailedError,
OwnerDiedError,
GetTimeoutError,
AsyncioActorExit,
RuntimeEnvSetupError,
Expand Down
6 changes: 5 additions & 1 deletion python/ray/includes/object_ref.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def _set_future_helper(

cdef class ObjectRef(BaseID):

def __init__(self, id, call_site_data=""):
def __init__(self, id, owner_addr="", call_site_data=""):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
self.owner_addr = owner_addr
self.in_core_worker = False
self.call_site_data = call_site_data

Expand Down Expand Up @@ -85,6 +86,9 @@ cdef class ObjectRef(BaseID):
def job_id(self):
return self.task_id().job_id()

def owner_address(self):
return self.owner_addr

def call_site(self):
return decode(self.call_site_data)

Expand Down
31 changes: 22 additions & 9 deletions python/ray/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from ray import ray_constants
import ray._private.utils
from ray._private.gcs_utils import ErrorType
from ray.exceptions import (RayError, PlasmaObjectNotAvailable, RayTaskError,
RayActorError, TaskCancelledError,
WorkerCrashedError, ObjectLostError,
RaySystemError, RuntimeEnvSetupError)
from ray.exceptions import (
RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError,
TaskCancelledError, WorkerCrashedError, ObjectLostError,
ReferenceCountingAssertionError, OwnerDiedError,
ObjectReconstructionFailedError, RaySystemError, RuntimeEnvSetupError)
from ray._raylet import (
split_buffer,
unpack_pickle5_buffers,
Expand Down Expand Up @@ -37,7 +38,7 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status):
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectRef is greater than 0 by the
# time the core worker resolves the value of the object.
obj_ref = ray.ObjectRef(binary, call_site)
obj_ref = ray.ObjectRef(binary, owner_address, call_site)

# TODO(edoakes): we should be able to just capture a reference
# to 'self' here instead, but this function is itself pickled
Expand Down Expand Up @@ -222,15 +223,27 @@ def _deserialize_object(self, data, metadata, object_ref):
return RayActorError()
elif error_type == ErrorType.Value("TASK_CANCELLED"):
return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
elif error_type == ErrorType.Value("OBJECT_LOST"):
return ObjectLostError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OBJECT_DELETED"):
return ReferenceCountingAssertionError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OWNER_DIED"):
return OwnerDiedError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectReconstructionFailedError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError()
else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
"Tried to get object that has been promoted to plasma."
assert False, "Unrecognized error type " + str(error_type)
return RaySystemError("Unrecognized error type " +
str(error_type))
elif data:
raise ValueError("non-null object should always have metadata")
else:
Expand Down
5 changes: 3 additions & 2 deletions python/ray/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import ray._private.profiling as profiling
from ray._private.test_utils import (client_test_enabled,
RayTestTimeoutException)
from ray.exceptions import ReferenceCountingAssertionError

if client_test_enabled():
from ray.util.client import ray
Expand Down Expand Up @@ -44,15 +45,15 @@ def sample_big(self):
obj_ref = sampler.sample.remote()
ray.get(obj_ref)
ray.internal.free(obj_ref)
with pytest.raises(Exception):
with pytest.raises(ReferenceCountingAssertionError):
ray.get(obj_ref)

# Free deletes big objects from plasma store.
big_id = sampler.sample_big.remote()
ray.get(big_id)
ray.internal.free(big_id)
time.sleep(1) # wait for delete RPC to propagate
with pytest.raises(Exception):
with pytest.raises(ReferenceCountingAssertionError):
ray.get(big_id)


Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_failure_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def sleep_to_kill_raylet():

thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start()
with pytest.raises(ray.exceptions.ObjectLostError):
with pytest.raises(ray.exceptions.ReferenceCountingAssertionError):
ray.get(object_ref)
thread.join()

Expand All @@ -307,7 +307,7 @@ def large_object():
# Evict the object.
ray.internal.free([obj])
# ray.get throws an exception.
with pytest.raises(ray.exceptions.ObjectLostError):
with pytest.raises(ray.exceptions.ReferenceCountingAssertionError):
ray.get(obj)

@ray.remote
Expand Down
Loading