Skip to content

Commit

Permalink
[xray] Object manager retries Pull requests (ray-project#2630)
Browse files Browse the repository at this point in the history
* Move all ObjectManager members to bottom of class def

* Better Pull requests
- suppress duplicate Pulls
- retry the Pull at the next client after a timeout
- cancel a Pull if the object no longer appears on any clients

* increase object manager Pull timeout

* Make the component failure test harder.

* note

* Notify SubscribeObjectLocations caller of empty list

* Address melih's comments

* Fix wait...

* Make component failure test easier for legacy ray

* lint
  • Loading branch information
stephanie-wang authored and pcmoritz committed Aug 14, 2018
1 parent baba624 commit 806fdf2
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 189 deletions.
5 changes: 1 addition & 4 deletions src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@ class RayConfig {
max_tasks_to_spillback_(10),
actor_creation_num_spillbacks_warning_(100),
node_manager_forward_task_retry_timeout_milliseconds_(1000),
// TODO: Setting this to large values results in latency, which needs to
// be addressed. This timeout is often on the critical path for object
// transfers.
object_manager_pull_timeout_ms_(20),
object_manager_pull_timeout_ms_(100),
object_manager_push_timeout_ms_(10000),
object_manager_default_chunk_size_(1000000),
num_workers_per_process_(1) {}
Expand Down
29 changes: 15 additions & 14 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ void ObjectDirectory::RegisterBackend() {
std::vector<ClientID> client_id_vec =
UpdateObjectLocations(object_id_listener_pair->second.current_object_locations,
location_history, gcs_client_->client_table());
if (!client_id_vec.empty()) {
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
auto callbacks = object_id_listener_pair->second.callbacks;
// Call all callbacks associated with the object id locations we have received.
for (const auto &callback_pair : callbacks) {
callback_pair.second(client_id_vec, object_id);
}
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
auto callbacks = object_id_listener_pair->second.callbacks;
// Call all callbacks associated with the object id locations we have
// received. This notifies the client even if the list of locations is
// empty, since this may indicate that the objects have been evicted from
// all nodes.
for (const auto &callback_pair : callbacks) {
callback_pair.second(client_id_vec, object_id);
}
};
RAY_CHECK_OK(gcs_client_->object_table().Subscribe(
Expand Down Expand Up @@ -131,12 +132,12 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
return ray::Status::OK();
}
listener_state.callbacks.emplace(callback_id, callback);
// Immediately notify of found object locations.
if (!listener_state.current_object_locations.empty()) {
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
callback(client_id_vec, object_id);
}
// Immediately notify of object locations. This notifies the client even if
// the list of locations is empty, since this may indicate that the objects
// have been evicted from all nodes.
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
callback(client_id_vec, object_id);
return status;
}

Expand Down
12 changes: 7 additions & 5 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ class ObjectDirectoryInterface {
const OnLocationsFound &callback) = 0;

/// Subscribe to be notified of locations (ClientID) of the given object.
/// The callback will be invoked whenever locations are obtained for the
/// specified object. The callback provided to this method may fire immediately,
/// within the call to this method, if any other listener is subscribed to the same
/// object: This occurs when location data for the object has already been obtained.
///
/// The callback will be invoked with the complete list of known locations
/// whenever the set of locations changes. The callback will also be fired if
/// the list of known locations is empty. The callback provided to this
/// method may fire immediately, within the call to this method, if any other
/// listener is subscribed to the same object: This occurs when location data
/// for the object has already been obtained.
//
/// \param callback_id The id associated with the specified callback. This is
/// needed when UnsubscribeObjectLocations is called.
/// \param object_id The required object's ObjectID.
Expand Down
174 changes: 125 additions & 49 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
store_notification_.SubscribeObjAdded([this](const ObjectInfoT &object_info) {
NotifyDirectoryObjectAdd(object_info);
HandleUnfulfilledPushRequests(object_info);
});
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { HandleObjectAdded(object_info); });
store_notification_.SubscribeObjDeleted(
[this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); });
StartIOService();
Expand All @@ -60,10 +58,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
RAY_CHECK(config_.max_receives > 0);
// TODO(hme) Client ID is never set with this constructor.
main_service_ = &main_service;
store_notification_.SubscribeObjAdded([this](const ObjectInfoT &object_info) {
NotifyDirectoryObjectAdd(object_info);
HandleUnfulfilledPushRequests(object_info);
});
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { HandleObjectAdded(object_info); });
store_notification_.SubscribeObjDeleted(
[this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); });
StartIOService();
Expand Down Expand Up @@ -97,15 +93,13 @@ void ObjectManager::StopIOService() {
}
}

void ObjectManager::NotifyDirectoryObjectAdd(const ObjectInfoT &object_info) {
void ObjectManager::HandleObjectAdded(const ObjectInfoT &object_info) {
// Notify the object directory that the object has been added to this node.
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
local_objects_[object_id] = object_info;
ray::Status status =
object_directory_->ReportObjectAdded(object_id, client_id_, object_info);
}

void ObjectManager::HandleUnfulfilledPushRequests(const ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.
auto iter = unfulfilled_push_requests_.find(object_id);
Expand All @@ -120,6 +114,10 @@ void ObjectManager::HandleUnfulfilledPushRequests(const ObjectInfoT &object_info
}
unfulfilled_push_requests_.erase(iter);
}

// The object is local, so we no longer need to Pull it from a remote
// manager. Cancel any outstanding Pull requests for this object.
CancelPull(object_id);
}

void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) {
Expand All @@ -145,38 +143,107 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
RAY_LOG(ERROR) << object_id << " attempted to pull an object that's already local.";
return ray::Status::OK();
}
ray::Status status_code = object_directory_->SubscribeObjectLocations(
if (pull_requests_.find(object_id) != pull_requests_.end()) {
return ray::Status::OK();
}

pull_requests_.emplace(object_id, PullRequest());
// Subscribe to object notifications. A notification will be received every
// time the set of client IDs for the object changes. Notifications will also
// be received if the list of locations is empty. The set of client IDs has
// no ordering guarantee between notifications.
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id,
[this](const std::vector<ClientID> &client_ids, const ObjectID &object_id) {
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(
object_directory_pull_callback_id_, object_id));
GetLocationsSuccess(client_ids, object_id);
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
return;
}
// Reset the list of clients that are now expected to have the object.
// NOTE(swang): Since we are overwriting the previous list of clients,
// we may end up sending a duplicate request to the same client as
// before.
it->second.client_locations = client_ids;
if (it->second.client_locations.empty()) {
// The object locations are now empty, so we should wait for the next
// notification about a new object location. Cancel the timer until
// the next Pull attempt since there are no more clients to try.
if (it->second.retry_timer != nullptr) {
it->second.retry_timer->cancel();
it->second.timer_set = false;
}
} else {
// New object locations were found.
if (!it->second.timer_set) {
// The timer was not set, which means that we weren't trying any
// clients. We now have some clients to try, so begin trying to
// Pull from one. If we fail to receive an object within the pull
// timeout, then this will try the rest of the clients in the list
// in succession.
TryPull(object_id);
}
}
});
return status_code;
}

void ObjectManager::GetLocationsSuccess(const std::vector<ray::ClientID> &client_ids,
const ray::ObjectID &object_id) {
if (local_objects_.count(object_id) == 0) {
// Only pull objects that aren't local.
RAY_CHECK(!client_ids.empty());
ClientID client_id = client_ids.front();
Pull(object_id, client_id);
}
}

void ObjectManager::Pull(const ObjectID &object_id, const ClientID &client_id) {
// Check if object is already local.
if (local_objects_.count(object_id) != 0) {
RAY_LOG(ERROR) << object_id << " attempted to pull an object that's already local.";
void ObjectManager::TryPull(const ObjectID &object_id) {
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
return;
}
// Check if we're pulling from self.

// The timer should never fire if there are no expected client locations.
RAY_CHECK(!it->second.client_locations.empty());
RAY_CHECK(local_objects_.count(object_id) == 0);

// Get the next client to try.
const ClientID client_id = std::move(it->second.client_locations.back());
it->second.client_locations.pop_back();
if (client_id == client_id_) {
// If we're trying to pull from ourselves, skip this client and try the
// next one.
RAY_LOG(ERROR) << client_id_ << " attempted to pull an object from itself.";
return;
const ClientID client_id = std::move(it->second.client_locations.back());
it->second.client_locations.pop_back();
RAY_CHECK(client_id != client_id_);
}

// Try pulling from the client.
PullEstablishConnection(object_id, client_id);

// If there are more clients to try, try them in succession, with a timeout
// in between each try.
if (!it->second.client_locations.empty()) {
if (it->second.retry_timer == nullptr) {
// Set the timer if we haven't already.
it->second.retry_timer = std::unique_ptr<boost::asio::deadline_timer>(
new boost::asio::deadline_timer(*main_service_));
}

// Wait for a timeout. If we receive the object or a caller Cancels the
// Pull within the timeout, then nothing will happen. Otherwise, the timer
// will fire and the next client in the list will be tried.
boost::posix_time::milliseconds retry_timeout(config_.pull_timeout_ms);
it->second.retry_timer->expires_from_now(retry_timeout);
it->second.retry_timer->async_wait(
[this, object_id](const boost::system::error_code &error) {
if (!error) {
// Try the Pull from the next client.
TryPull(object_id);
} else {
// Check that the error was due to the timer being canceled.
RAY_CHECK(error == boost::asio::error::operation_aborted);
}
});
// Record that we set the timer until the next attempt.
it->second.timer_set = true;
} else {
// The timer is not reset since there are no more clients to try. Go back
// to waiting for more notifications. Once we receive a new object location
// from the object directory, then the Pull will be retried.
it->second.timer_set = false;
}
};

void ObjectManager::PullEstablishConnection(const ObjectID &object_id,
Expand Down Expand Up @@ -370,10 +437,15 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
return status;
}

ray::Status ObjectManager::Cancel(const ObjectID &object_id) {
ray::Status status = object_directory_->UnsubscribeObjectLocations(
object_directory_pull_callback_id_, object_id);
return status;
void ObjectManager::CancelPull(const ObjectID &object_id) {
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
return;
}

RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(
object_directory_pull_callback_id_, object_id));
pull_requests_.erase(it);
}

ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
Expand Down Expand Up @@ -481,22 +553,26 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &subscribe_object_id) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
// We never expect to handle a subscription notification for a wait that has
// already completed.
RAY_CHECK(object_id_wait_state != active_wait_requests_.end());
auto &wait_state = object_id_wait_state->second;
RAY_CHECK(wait_state.remaining.erase(subscribe_object_id));
wait_state.found.insert(subscribe_object_id);
wait_state.requested_objects.erase(subscribe_object_id);
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(
wait_id, subscribe_object_id));
if (wait_state.found.size() >= wait_state.num_required_objects) {
WaitComplete(wait_id);
if (!client_ids.empty()) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
// We never expect to handle a subscription notification for a wait that has
// already completed.
RAY_CHECK(object_id_wait_state != active_wait_requests_.end());
auto &wait_state = object_id_wait_state->second;
RAY_CHECK(wait_state.remaining.erase(subscribe_object_id));
wait_state.found.insert(subscribe_object_id);
wait_state.requested_objects.erase(subscribe_object_id);
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(
wait_id, subscribe_object_id));
if (wait_state.found.size() >= wait_state.num_required_objects) {
WaitComplete(wait_id);
}
}
}));
}
if (wait_state.timeout_ms != -1) {
auto timeout = boost::posix_time::milliseconds(wait_state.timeout_ms);
wait_state.timeout_timer->expires_from_now(timeout);
wait_state.timeout_timer->async_wait(
[this, wait_id](const boost::system::error_code &error_code) {
if (error_code.value() != 0) {
Expand Down
Loading

0 comments on commit 806fdf2

Please sign in to comment.