Skip to content

Commit

Permalink
Pin a return object if it already exists when the task finishes
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang committed Nov 16, 2021
1 parent 00cfdef commit d73b014
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 22 deletions.
32 changes: 17 additions & 15 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1768,7 +1768,6 @@ cdef class CoreWorker:
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
c_vector[CObjectID] return_ids_vector
int64_t task_output_inlined_bytes

if return_ids.size() == 0:
Expand Down Expand Up @@ -1801,24 +1800,27 @@ cdef class CoreWorker:
return_id, data_size, metadata, contained_id,
task_output_inlined_bytes, &returns[0][i]))

if returns[0][i].get() != NULL:
if self.is_local_mode:
check_status(
CCoreWorkerProcess.GetCoreWorker().Put(
CRayObject(returns[0][i].get().GetData(),
returns[0][i].get().GetMetadata(),
c_vector[CObjectReference]()),
c_vector[CObjectID](), return_ids[i]))

elif returns[0][i].get() != NULL:
if returns[0][i].get().HasData():
(<SerializedObject>serialized_object).write_to(
Buffer.make(returns[0][i].get().GetData()))
if self.is_local_mode:
return_ids_vector.push_back(return_ids[i])
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().Put(
CRayObject(returns[0][i].get().GetData(),
returns[0][i].get().GetMetadata(),
c_vector[CObjectReference]()),
c_vector[CObjectID](), return_ids[i]))
return_ids_vector.clear()

with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealReturnObject(
return_id, returns[0][i]))
CCoreWorkerProcess.GetCoreWorker().SealReturnObject(
return_id, returns[0][i]))
else:
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker()
.PinExistingReturnObject(return_id, &returns[0][i]))

def create_or_get_event_loop(self):
if self.async_event_loop is None:
Expand Down
4 changes: 4 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CObjectID& return_id,
shared_ptr[CRayObject] return_object
)
CRayStatus PinExistingReturnObject(
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object
)

CJobID GetCurrentJobId()
CTaskID GetCurrentTaskId()
Expand Down
155 changes: 155 additions & 0 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import signal
import sys
import time

import numpy as np
import pytest
import psutil

import ray
from ray._private.test_utils import (
Expand Down Expand Up @@ -645,6 +647,159 @@ def dependent_task(x):
i += 1


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_nondeterministic_output(ray_start_cluster, reconstruction_enabled):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"max_direct_call_object_size": 100,
"task_retry_delay_ms": 100,
"object_timeout_milliseconds": 200,
}
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0, _system_config=config, enable_object_reconstruction=True)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()

@ray.remote
def nondeterministic_object():
if np.random.rand() < 0.5:
return np.zeros(10**5, dtype=np.uint8)
else:
return 0

@ray.remote
def dependent_task(x):
return

for _ in range(10):
obj = nondeterministic_object.options(resources={"node1": 1}).remote()
for _ in range(3):
ray.get(dependent_task.remote(obj))
x = dependent_task.remote(obj)
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
ray.get(x)


def test_lineage_evicted(ray_start_cluster):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
"max_lineage_bytes": 10_000,
}

cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
object_store_memory=10**8,
enable_object_reconstruction=True)
ray.init(address=cluster.address)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()

@ray.remote
def large_object():
return np.zeros(10**7, dtype=np.uint8)

@ray.remote
def chain(x):
return x

@ray.remote
def dependent_task(x):
return x

obj = large_object.remote()
for _ in range(5):
obj = chain.remote(obj)
ray.get(dependent_task.remote(obj))

cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
ray.get(dependent_task.remote(obj))

# Lineage now exceeds the eviction factor.
for _ in range(100):
obj = chain.remote(obj)
ray.get(dependent_task.remote(obj))

cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(num_cpus=1, object_store_memory=10**8)
try:
ray.get(dependent_task.remote(obj))
assert False
except ray.exceptions.RayTaskError as e:
assert "ObjectReconstructionFailedLineageEvictedError" in str(e)


def test_worker_crashes(ray_start_cluster):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
}
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _system_config=config)
ray.init(address=cluster.address)
# Node to place the initial object.
cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()

@ray.remote(num_cpus=0)
def kill_workers():
pids = []
for proc in psutil.process_iter():
if "ray::" in proc.name() and proc.pid != os.getpid():
pids.append(proc.pid)
time.sleep(1)
num_killed = 0
for pid in pids:
print("Killing", proc.pid)
try:
os.kill(proc.pid, SIGKILL)
num_killed += 1
except Exception:
pass
return num_killed

@ray.remote
def large_object():
return np.zeros(10**7, dtype=np.uint8)

@ray.remote
def dependent_task(x):
return

def check_workers_killed():
killed = kill_workers.remote()
for i in range(100):
obj = large_object.remote()
# Check that we don't hang (can happen if the worker raylet
# crashes).
try:
ray.get(dependent_task.remote(obj))
except Exception:
pass
print(i)

return ray.get(killed) > 0

wait_for_condition(ceck_workers_killed)


if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
52 changes: 46 additions & 6 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2323,13 +2323,10 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
Status CoreWorker::SealReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> return_object) {
Status status = Status::OK();
if (!return_object) {
return status;
}
RAY_CHECK(return_object);
RAY_CHECK(!options_.is_local_mode);
std::unique_ptr<rpc::Address> caller_address =
options_.is_local_mode ? nullptr
: std::make_unique<rpc::Address>(
worker_context_.GetCurrentTask()->CallerAddress());
std::make_unique<rpc::Address>(worker_context_.GetCurrentTask()->CallerAddress());
if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) {
status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address));
if (!status.ok()) {
Expand All @@ -2340,6 +2337,49 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id,
return status;
}

Status CoreWorker::PinExistingReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> *return_object) {
// TODO(swang): It would be better to evict the existing copy instad of
// reusing it, to make sure it's consistent with the task re-execution.
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
bool got_exception;
rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress());

// Temporarily set the return object's owner's address. This is needed to retrieve the
// value from plasma.
reference_counter_->AddLocalReference(return_id, "<temporary (pin return object)>");
reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address);

auto status = plasma_store_provider_->Get({return_id}, 0, worker_context_, &result_map,
&got_exception);
// Remove the temporary ref.
RemoveLocalReference(return_id);

if (result_map.count(return_id)) {
*return_object = std::move(result_map[return_id]);
RAY_LOG(DEBUG) << "Pinning existing return object " << return_id
<< " owned by worker "
<< WorkerID::FromBinary(owner_address.worker_id());
local_raylet_client_->PinObjectIDs(
owner_address, {return_id},
[return_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the task return object "
<< return_id
<< ". This object may get evicted while there are still "
"references to it.";
}
});
} else {
// Failed to get the existing copy of the return object. It must have been
// evicted before we could pin it.
// TODO(swang): We should allow the owner to retry this task instead of
// immediately returning an error to the application.
}

return status;
}

std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
const TaskSpecification &task_spec, const ActorID &actor_id) {
auto resource_ids = std::make_shared<ResourceMappingType>();
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
Status SealReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> return_object);

Status PinExistingReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> *return_object);

/// Get a handle to an actor.
///
/// NOTE: This function should be called ONLY WHEN we know actor handle exists.
Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
return_object->set_object_id(id.Binary());

// The object is nullptr if it already existed in the object store.
if (!return_objects[i]) {
RAY_LOG(INFO) << "Failed to create task return object " << id
<< " in the object store, returning an error to the application.";
return_objects[i] = std::make_shared<RayObject>(rpc::ErrorType::OBJECT_LOST);
}

const auto &result = return_objects[i];
return_object->set_size(result->GetSize());
if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) {
Expand Down
1 change: 0 additions & 1 deletion src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,6 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// need to do anything here.
return;
} else if (!status.ok() || !is_actor_creation) {
RAY_LOG(DEBUG) << "Task failed with error: " << status;
// Successful actor creation leases the worker indefinitely from the raylet.
OnWorkerIdle(addr, scheduling_key,
/*error=*/!status.ok(), assigned_resources);
Expand Down

0 comments on commit d73b014

Please sign in to comment.