From d73b014d57b89b2b124feffbdaf3dadd46c8e935 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 16 Nov 2021 10:07:03 -0800 Subject: [PATCH] Pin a return object if it already exists when the task finishes --- python/ray/_raylet.pyx | 32 ++-- python/ray/includes/libcoreworker.pxd | 4 + python/ray/tests/test_reconstruction.py | 155 ++++++++++++++++++ src/ray/core_worker/core_worker.cc | 52 +++++- src/ray/core_worker/core_worker.h | 3 + .../transport/direct_actor_transport.cc | 6 + .../transport/direct_task_transport.cc | 1 - 7 files changed, 231 insertions(+), 22 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 31e362649b5c..c38a38696f4e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1768,7 +1768,6 @@ cdef class CoreWorker: size_t data_size shared_ptr[CBuffer] metadata c_vector[CObjectID] contained_id - c_vector[CObjectID] return_ids_vector int64_t task_output_inlined_bytes if return_ids.size() == 0: @@ -1801,24 +1800,27 @@ cdef class CoreWorker: return_id, data_size, metadata, contained_id, task_output_inlined_bytes, &returns[0][i])) - if returns[0][i].get() != NULL: + if self.is_local_mode: + check_status( + CCoreWorkerProcess.GetCoreWorker().Put( + CRayObject(returns[0][i].get().GetData(), + returns[0][i].get().GetMetadata(), + c_vector[CObjectReference]()), + c_vector[CObjectID](), return_ids[i])) + + elif returns[0][i].get() != NULL: if returns[0][i].get().HasData(): (serialized_object).write_to( Buffer.make(returns[0][i].get().GetData())) - if self.is_local_mode: - return_ids_vector.push_back(return_ids[i]) + with nogil: check_status( - CCoreWorkerProcess.GetCoreWorker().Put( - CRayObject(returns[0][i].get().GetData(), - returns[0][i].get().GetMetadata(), - c_vector[CObjectReference]()), - c_vector[CObjectID](), return_ids[i])) - return_ids_vector.clear() - - with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker().SealReturnObject( - return_id, returns[0][i])) + CCoreWorkerProcess.GetCoreWorker().SealReturnObject( + return_id, returns[0][i])) + else: + with nogil: + check_status( + CCoreWorkerProcess.GetCoreWorker() + .PinExistingReturnObject(return_id, &returns[0][i])) def create_or_get_event_loop(self): if self.async_event_loop is None: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index d52c1b942c91..1bc3260eef09 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -145,6 +145,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CObjectID& return_id, shared_ptr[CRayObject] return_object ) + CRayStatus PinExistingReturnObject( + const CObjectID& return_id, + shared_ptr[CRayObject] *return_object + ) CJobID GetCurrentJobId() CTaskID GetCurrentTaskId() diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index a26d2b288982..c9c7a2d3c99f 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -1,9 +1,11 @@ import os import signal import sys +import time import numpy as np import pytest +import psutil import ray from ray._private.test_utils import ( @@ -645,6 +647,159 @@ def dependent_task(x): i += 1 +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_nondeterministic_output(ray_start_cluster, reconstruction_enabled): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "max_direct_call_object_size": 100, + "task_retry_delay_ms": 100, + "object_timeout_milliseconds": 200, + } + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, _system_config=config, enable_object_reconstruction=True) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + cluster.add_node(num_cpus=1, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote + def nondeterministic_object(): + if np.random.rand() < 0.5: + return np.zeros(10**5, dtype=np.uint8) + else: + return 0 + + @ray.remote + def dependent_task(x): + return + + for _ in range(10): + obj = nondeterministic_object.options(resources={"node1": 1}).remote() + for _ in range(3): + ray.get(dependent_task.remote(obj)) + x = dependent_task.remote(obj) + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + ray.get(x) + + +def test_lineage_evicted(ray_start_cluster): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "object_timeout_milliseconds": 200, + "max_lineage_bytes": 10_000, + } + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + object_store_memory=10**8, + enable_object_reconstruction=True) + ray.init(address=cluster.address) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def chain(x): + return x + + @ray.remote + def dependent_task(x): + return x + + obj = large_object.remote() + for _ in range(5): + obj = chain.remote(obj) + ray.get(dependent_task.remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) + ray.get(dependent_task.remote(obj)) + + # Lineage now exceeds the eviction factor. + for _ in range(100): + obj = chain.remote(obj) + ray.get(dependent_task.remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node(num_cpus=1, object_store_memory=10**8) + try: + ray.get(dependent_task.remote(obj)) + assert False + except ray.exceptions.RayTaskError as e: + assert "ObjectReconstructionFailedLineageEvictedError" in str(e) + + +def test_worker_crashes(ray_start_cluster): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "object_timeout_milliseconds": 200, + } + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node(num_cpus=0, _system_config=config) + ray.init(address=cluster.address) + # Node to place the initial object. + cluster.add_node(num_cpus=1, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote(num_cpus=0) + def kill_workers(): + pids = [] + for proc in psutil.process_iter(): + if "ray::" in proc.name() and proc.pid != os.getpid(): + pids.append(proc.pid) + time.sleep(1) + num_killed = 0 + for pid in pids: + print("Killing", proc.pid) + try: + os.kill(proc.pid, SIGKILL) + num_killed += 1 + except Exception: + pass + return num_killed + + @ray.remote + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def dependent_task(x): + return + + def check_workers_killed(): + killed = kill_workers.remote() + for i in range(100): + obj = large_object.remote() + # Check that we don't hang (can happen if the worker raylet + # crashes). + try: + ray.get(dependent_task.remote(obj)) + except Exception: + pass + print(i) + + return ray.get(killed) > 0 + + wait_for_condition(ceck_workers_killed) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4250dcc6dae5..8a6155e90d93 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2323,13 +2323,10 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, Status CoreWorker::SealReturnObject(const ObjectID &return_id, std::shared_ptr return_object) { Status status = Status::OK(); - if (!return_object) { - return status; - } + RAY_CHECK(return_object); + RAY_CHECK(!options_.is_local_mode); std::unique_ptr caller_address = - options_.is_local_mode ? nullptr - : std::make_unique( - worker_context_.GetCurrentTask()->CallerAddress()); + std::make_unique(worker_context_.GetCurrentTask()->CallerAddress()); if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address)); if (!status.ok()) { @@ -2340,6 +2337,49 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, return status; } +Status CoreWorker::PinExistingReturnObject(const ObjectID &return_id, + std::shared_ptr *return_object) { + // TODO(swang): It would be better to evict the existing copy instad of + // reusing it, to make sure it's consistent with the task re-execution. + absl::flat_hash_map> result_map; + bool got_exception; + rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress()); + + // Temporarily set the return object's owner's address. This is needed to retrieve the + // value from plasma. + reference_counter_->AddLocalReference(return_id, ""); + reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address); + + auto status = plasma_store_provider_->Get({return_id}, 0, worker_context_, &result_map, + &got_exception); + // Remove the temporary ref. + RemoveLocalReference(return_id); + + if (result_map.count(return_id)) { + *return_object = std::move(result_map[return_id]); + RAY_LOG(DEBUG) << "Pinning existing return object " << return_id + << " owned by worker " + << WorkerID::FromBinary(owner_address.worker_id()); + local_raylet_client_->PinObjectIDs( + owner_address, {return_id}, + [return_id](const Status &status, const rpc::PinObjectIDsReply &reply) { + if (!status.ok()) { + RAY_LOG(INFO) << "Failed to pin existing copy of the task return object " + << return_id + << ". This object may get evicted while there are still " + "references to it."; + } + }); + } else { + // Failed to get the existing copy of the return object. It must have been + // evicted before we could pin it. + // TODO(swang): We should allow the owner to retry this task instead of + // immediately returning an error to the application. + } + + return status; +} + std::vector CoreWorker::ExecuteTaskLocalMode( const TaskSpecification &task_spec, const ActorID &actor_id) { auto resource_ids = std::make_shared(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 755e18aeb16b..18585aaefd33 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -862,6 +862,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status SealReturnObject(const ObjectID &return_id, std::shared_ptr return_object); + Status PinExistingReturnObject(const ObjectID &return_id, + std::shared_ptr *return_object); + /// Get a handle to an actor. /// /// NOTE: This function should be called ONLY WHEN we know actor handle exists. diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index d50936286a5e..c3e78a30d8d7 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -506,6 +506,12 @@ void CoreWorkerDirectTaskReceiver::HandleTask( return_object->set_object_id(id.Binary()); // The object is nullptr if it already existed in the object store. + if (!return_objects[i]) { + RAY_LOG(INFO) << "Failed to create task return object " << id + << " in the object store, returning an error to the application."; + return_objects[i] = std::make_shared(rpc::ErrorType::OBJECT_LOST); + } + const auto &result = return_objects[i]; return_object->set_size(result->GetSize()); if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) { diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 6f3def98883d..69bf7236578b 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -648,7 +648,6 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // need to do anything here. return; } else if (!status.ok() || !is_actor_creation) { - RAY_LOG(DEBUG) << "Task failed with error: " << status; // Successful actor creation leases the worker indefinitely from the raylet. OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources);