Skip to content

Commit

Permalink
[Object Spilling] Fix a bug where object url is empty. (#18193)
Browse files Browse the repository at this point in the history
* Fix a bug

* Addressed code review.

* Fix a test
  • Loading branch information
rkooo567 authored Aug 31, 2021
1 parent 2c0dcec commit d240d26
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 9 deletions.
20 changes: 11 additions & 9 deletions src/ray/object_manager/pull_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,18 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) {
return;
}

// If we can restore directly from this raylet, then try to do so.
std::string spilled_url = get_locally_spilled_object_url_(object_id);
bool can_restore_directly =
!spilled_url.empty() || // If the object is spilled locally
(!request.spilled_url.empty() &&
request.spilled_node_id
.IsNil()); // Or if the object is spilled on external storages.
if (can_restore_directly) {
// check if we can restore the object directly in the current raylet.
// first check local spilled objects
std::string direct_restore_url = get_locally_spilled_object_url_(object_id);
if (direct_restore_url.empty()) {
if (!request.spilled_url.empty() && request.spilled_node_id.IsNil()) {
direct_restore_url = request.spilled_url;
}
}
if (!direct_restore_url.empty()) {
// Select an url from the object directory update
UpdateRetryTimer(request, object_id);
restore_spilled_object_(object_id, request.spilled_url,
restore_spilled_object_(object_id, direct_restore_url,
[object_id](const ray::Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Object restore for " << object_id
Expand Down
105 changes: 105 additions & 0 deletions src/ray/object_manager/test/pull_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,111 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) {
AssertNoLeaks();
}

TEST_P(PullManagerTest, TestRestoreSpilledObjectOnLocalStorage) {
/// Test the scneario where the object is spilled to local storage, like filesystems.
auto prio = BundlePriority::TASK_ARGS;
if (GetParam()) {
prio = BundlePriority::GET_REQUEST;
}
auto refs = CreateObjectRefs(1);
auto obj1 = ObjectRefsToIds(refs)[0];
rpc::Address addr1;
AssertNumActiveRequestsEquals(0);
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, prio, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));

std::unordered_set<NodeID> client_ids;
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);

// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);

fake_time_ += 10.;
// Objects are spilled locally, but the remote object directory doesn't have the
// information. It should still restore objects.
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "", self_node_id_, 0);

// We request a local restore.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);

// The call can be retried after a delay, and the url in the remote object directory is
// updated now.
fake_time_ += 10.;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_,
0);
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);

ASSERT_TRUE(num_abort_calls_.empty());
ASSERT_TRUE(pull_manager_.PullRequestActiveOrWaitingForMetadata(req_id));
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs));
ASSERT_EQ(num_abort_calls_[obj1], 1);

AssertNoLeaks();
}

TEST_P(PullManagerTest, TestRestoreSpilledObjectOnExternalStorage) {
/// Test the scneario where the object is spilled to external storages, such as S3.
auto prio = BundlePriority::TASK_ARGS;
if (GetParam()) {
prio = BundlePriority::GET_REQUEST;
}
auto refs = CreateObjectRefs(1);
auto obj1 = ObjectRefsToIds(refs)[0];
rpc::Address addr1;
AssertNumActiveRequestsEquals(0);
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, prio, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));

std::unordered_set<NodeID> client_ids;
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);

// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);

fake_time_ += 10.;
// Objects are spilled to the empty URL locally if it is spilled to external storages.
ObjectSpilled(obj1, "");
// If objects are spilled to external storages, the node id should be Nil().
// So this shouldn't invoke restoration.
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_,
0);

// We request a local restore.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);

// Now Nil ID is properly updated.
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", NodeID::Nil(),
0);

// We request a local restore.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);

// The call can be retried after a delay.
fake_time_ += 10.;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", NodeID::Nil(),
0);
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);

ASSERT_TRUE(num_abort_calls_.empty());
ASSERT_TRUE(pull_manager_.PullRequestActiveOrWaitingForMetadata(req_id));
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs));
ASSERT_EQ(num_abort_calls_[obj1], 1);

AssertNoLeaks();
}

TEST_P(PullManagerTest, TestLoadBalancingRestorationRequest) {
/* Make sure when the object copy is in other raylet, we pull object from there instead
* of requesting the owner node to restore the object. */
Expand Down

0 comments on commit d240d26

Please sign in to comment.