Skip to content

Commit

Permalink
[core] Post _on_completed callbacks to io_service_ instead of run…
Browse files Browse the repository at this point in the history
…ning them synchronously (ray-project#39112)

Fixes the deadlock described in the linked issue by posting callbacks to the `io_service_`. This avoids running them while holding locks (such as from the task manager).
  • Loading branch information
edoakes authored Aug 31, 2023
1 parent e307825 commit 9350b16
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
14 changes: 7 additions & 7 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4393,16 +4393,16 @@ cdef class CoreWorker:
postincrement(it)
return tasks_count

def set_get_async_callback(self, ObjectRef object_ref, callback):
def set_get_async_callback(self, ObjectRef object_ref, user_callback: Callable):
# NOTE: we need to manually increment the Python reference count to avoid the
# callback object being garbage collected before it's called by the core worker.
# This means we *must* guarantee that the ref is manually decremented to avoid
# a leak.
cpython.Py_INCREF(callback)
cpython.Py_INCREF(user_callback)
CCoreWorkerProcess.GetCoreWorker().GetAsync(
object_ref.native(),
async_callback,
<void*>callback
<void*>user_callback
)

def push_error(self, JobID job_id, error_type, error_message,
Expand Down Expand Up @@ -4550,7 +4550,7 @@ cdef class CoreWorker:

cdef void async_callback(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *user_callback) with gil:
void *user_callback_ptr) with gil:
cdef:
c_vector[shared_ptr[CRayObject]] objects_to_deserialize

Expand All @@ -4564,12 +4564,12 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
result = ray._private.worker.global_worker.deserialize_objects(
data_metadata_pairs, ids_to_deserialize)[0]

py_callback = <object>user_callback
py_callback(result)
user_callback = <object>user_callback_ptr
user_callback(result)
finally:
# NOTE: we manually increment the Python reference count of the callback when
# registering it in the core worker, so we must decrement here to avoid a leak.
cpython.Py_DECREF(py_callback)
cpython.Py_DECREF(user_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:

void GetAsync(const CObjectID &object_id,
ray_callback_function success_callback,
void* python_future)
void* python_user_callback)

CRayStatus PushError(const CJobID &job_id, const c_string &type,
const c_string &error_message, double timestamp)
Expand Down
37 changes: 27 additions & 10 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3993,23 +3993,40 @@ void CoreWorker::YieldCurrentFiber(FiberEvent &event) {

void CoreWorker::GetAsync(const ObjectID &object_id,
SetResultCallback success_callback,
void *python_future) {
void *python_user_callback) {
auto fallback_callback = std::bind(&CoreWorker::PlasmaCallback,
this,
success_callback,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3);

memory_store_->GetAsync(object_id,
[python_future, success_callback, fallback_callback, object_id](
std::shared_ptr<RayObject> ray_object) {
if (ray_object->IsInPlasmaError()) {
fallback_callback(ray_object, object_id, python_future);
} else {
success_callback(ray_object, object_id, python_future);
}
});
memory_store_->GetAsync(
object_id,
[this,
object_id,
python_user_callback,
success_callback = std::move(success_callback),
fallback_callback =
std::move(fallback_callback)](std::shared_ptr<RayObject> ray_object) {
// Post the callback to the io_service_ to avoid deadlocks.
// The user callback can make arbitrary Ray API calls and will be called
// immediately when the object is `Put` into the in-memory store. This can
// cause deadlocks if the callers of `Put` is holding a lock.
io_service_.post(
[object_id,
python_user_callback,
success_callback = std::move(success_callback),
fallback_callback = std::move(fallback_callback),
ray_object = std::move(ray_object)]() {
if (ray_object->IsInPlasmaError()) {
fallback_callback(ray_object, object_id, python_user_callback);
} else {
success_callback(ray_object, object_id, python_user_callback);
}
},
"CoreWorker.GetAsync.Callback");
});
}

void CoreWorker::PlasmaCallback(SetResultCallback success,
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1240,11 +1240,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
///
/// \param[in] object_id The id to call get on.
/// \param[in] success_callback The callback to use the result object.
/// \param[in] python_future the void* object to be passed to SetResultCallback
/// \param[in] python_user_callback The user-provided Python callback object that
/// will be called inside of `success_callback`.
/// \return void
void GetAsync(const ObjectID &object_id,
SetResultCallback success_callback,
void *python_future);
void *python_user_callback);

// Get serialized job configuration.
rpc::JobConfig GetJobConfig() const;
Expand Down

0 comments on commit 9350b16

Please sign in to comment.