Skip to content

Commit

Permalink
[xray] Use pubsub instead of timeout for ObjectManager Pull. (#2079)
Browse files Browse the repository at this point in the history
Use pubsub instead of timeout for Pull.
  • Loading branch information
elibol authored May 19, 2018
1 parent 9e46de9 commit f1da721
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 137 deletions.
109 changes: 53 additions & 56 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,41 @@

namespace ray {

ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> gcs_client) {
ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client) {
gcs_client_ = gcs_client;
};
}

void ObjectDirectory::RegisterBackend() {
auto object_notification_callback = [this](gcs::AsyncGcsClient *client,
const ObjectID &object_id,
const std::vector<ObjectTableDataT> &data) {
// Objects are added to this map in SubscribeObjectLocations.
auto entry = listeners_.find(object_id);
// Do nothing for objects we are not listening for.
if (entry == listeners_.end()) {
return;
}
// Update entries for this object.
auto client_id_set = entry->second.client_ids;
for (auto &object_table_data : data) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (!object_table_data.is_eviction) {
client_id_set.insert(client_id);
} else {
client_id_set.erase(client_id);
}
}
if (!client_id_set.empty()) {
// Only call the callback if we have object locations.
std::vector<ClientID> client_id_vec(client_id_set.begin(), client_id_set.end());
auto callback = entry->second.locations_found_callback;
callback(client_id_vec, object_id);
}
};
RAY_CHECK_OK(gcs_client_->object_table().Subscribe(
UniqueID::nil(), gcs_client_->client_table().GetLocalClientId(),
object_notification_callback, nullptr));
}

ray::Status ObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
const ClientID &client_id,
Expand All @@ -19,7 +51,7 @@ ray::Status ObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
ray::Status status =
gcs_client_->object_table().Append(job_id, object_id, data, nullptr);
return status;
};
}

ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
Expand Down Expand Up @@ -52,63 +84,28 @@ ray::Status ObjectDirectory::GetInformation(const ClientID &client_id,
success_callback(info);
}
return ray::Status::OK();
};
}

ray::Status ObjectDirectory::GetLocations(const ObjectID &object_id,
const OnLocationsSuccess &success_callback,
const OnLocationsFailure &fail_callback) {
ray::Status status_code = ray::Status::OK();
if (existing_requests_.count(object_id) == 0) {
existing_requests_[object_id] = ODCallbacks({success_callback, fail_callback});
status_code = ExecuteGetLocations(object_id);
} else {
// Do nothing. A request is in progress.
ray::Status ObjectDirectory::SubscribeObjectLocations(const ObjectID &object_id,
const OnLocationsFound &callback) {
if (listeners_.find(object_id) != listeners_.end()) {
RAY_LOG(ERROR) << "Duplicate calls to SubscribeObjectLocations for " << object_id;
return ray::Status::OK();
}
return status_code;
};

ray::Status ObjectDirectory::ExecuteGetLocations(const ObjectID &object_id) {
JobID job_id = JobID::nil();
// Note: Lookup must be synchronous for thread-safe access.
// For now, this is only accessed by the main thread.
ray::Status status = gcs_client_->object_table().Lookup(
job_id, object_id, [this](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &data) {
GetLocationsComplete(object_id, data);
});
return status;
};
listeners_.emplace(object_id, LocationListenerState(callback));
return gcs_client_->object_table().RequestNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
}

void ObjectDirectory::GetLocationsComplete(
const ObjectID &object_id, const std::vector<ObjectTableDataT> &location_entries) {
auto request = existing_requests_.find(object_id);
// Do not invoke a callback if the request was cancelled.
if (request == existing_requests_.end()) {
return;
}
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> locations;
for (auto entry : location_entries) {
ClientID client_id = ClientID::from_binary(entry.manager);
if (!entry.is_eviction) {
locations.insert(client_id);
} else {
locations.erase(client_id);
}
ray::Status ObjectDirectory::UnsubscribeObjectLocations(const ObjectID &object_id) {
auto entry = listeners_.find(object_id);
if (entry == listeners_.end()) {
return ray::Status::OK();
}
// Invoke the callback.
std::vector<ClientID> locations_vector(locations.begin(), locations.end());
if (locations_vector.empty()) {
request->second.fail_cb(object_id);
} else {
request->second.success_cb(locations_vector, object_id);
}
existing_requests_.erase(request);
ray::Status status = gcs_client_->object_table().CancelNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
listeners_.erase(entry);
return status;
}

ray::Status ObjectDirectory::Cancel(const ObjectID &object_id) {
existing_requests_.erase(object_id);
return ray::Status::OK();
};

} // namespace ray
67 changes: 33 additions & 34 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ObjectDirectoryInterface {
using InfoSuccessCallback = std::function<void(const ray::RemoteConnectionInfo &info)>;
using InfoFailureCallback = std::function<void(ray::Status status)>;

virtual void RegisterBackend() = 0;

/// This is used to establish object manager client connections.
///
/// \param client_id The client for which information is required.
Expand All @@ -43,27 +45,25 @@ class ObjectDirectoryInterface {
const InfoSuccessCallback &success_cb,
const InfoFailureCallback &fail_cb) = 0;

// Callbacks for GetLocations.
using OnLocationsSuccess = std::function<void(const std::vector<ray::ClientID> &v,
const ray::ObjectID &object_id)>;
using OnLocationsFailure = std::function<void(const ray::ObjectID &object_id)>;
/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const std::vector<ray::ClientID> &v,
const ray::ObjectID &object_id)>;

/// Asynchronously obtain the locations of an object by ObjectID.
/// This is used to handle object pulls.
/// Subscribe to be notified of locations (ClientID) of the given object.
/// The callback will be invoked whenever locations are obtained for the
/// specified object.
///
/// \param object_id The required object's ObjectID.
/// \param success_cb Invoked upon success with list of remote connection info.
/// \param fail_cb Invoked upon failure with ray status and object id.
/// \return Status of whether this asynchronous request succeeded.
virtual ray::Status GetLocations(const ObjectID &object_id,
const OnLocationsSuccess &success_cb,
const OnLocationsFailure &fail_cb) = 0;
/// \param success_cb Invoked with non-empty list of client ids and object_id.
/// \return Status of whether subscription succeeded.
virtual ray::Status SubscribeObjectLocations(const ObjectID &object_id,
const OnLocationsFound &callback) = 0;

/// Cancels the invocation of the callback associated with callback_id.
/// Unsubscribe to object location notifications.
///
/// \param object_id The object id invoked with GetLocations.
/// \return Status of whether this method succeeded.
virtual ray::Status Cancel(const ObjectID &object_id) = 0;
/// \param object_id The object id invoked with Subscribe.
/// \return
virtual ray::Status UnsubscribeObjectLocations(const ObjectID &object_id) = 0;

/// Report objects added to this node's store to the object directory.
///
Expand All @@ -90,40 +90,39 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ObjectDirectory() = default;
~ObjectDirectory() override = default;

void RegisterBackend() override;

ray::Status GetInformation(const ClientID &client_id,
const InfoSuccessCallback &success_callback,
const InfoFailureCallback &fail_callback) override;
ray::Status GetLocations(const ObjectID &object_id,
const OnLocationsSuccess &success_callback,
const OnLocationsFailure &fail_callback) override;
ray::Status Cancel(const ObjectID &object_id) override;

ray::Status SubscribeObjectLocations(const ObjectID &object_id,
const OnLocationsFound &callback) override;
ray::Status UnsubscribeObjectLocations(const ObjectID &object_id) override;

ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const ObjectInfoT &object_info) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
/// Ray only (not part of the OD interface).
ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> gcs_client);
ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client);

/// ObjectDirectory should not be copied.
RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory);

private:
/// Callbacks associated with a call to GetLocations.
// TODO(hme): I think these can be removed.
struct ODCallbacks {
OnLocationsSuccess success_cb;
OnLocationsFailure fail_cb;
struct LocationListenerState {
LocationListenerState(const OnLocationsFound &locations_found_callback)
: locations_found_callback(locations_found_callback) {}
/// The callback to invoke when object locations are found.
OnLocationsFound locations_found_callback;
/// The current set of known locations of this object.
std::unordered_set<ClientID> client_ids;
};

/// GetLocations registers a request for locations.
/// This function actually carries out that request.
ray::Status ExecuteGetLocations(const ObjectID &object_id);
/// Invoked when call to ExecuteGetLocations completes.
void GetLocationsComplete(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_entries);

/// Maintain map of in-flight GetLocation requests.
std::unordered_map<ObjectID, ODCallbacks> existing_requests_;
/// Info about subscribers to object locations.
std::unordered_map<ObjectID, LocationListenerState> listeners_;
/// Reference to the gcs client.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// Map from object ID to the number of times it's been evicted on this
Expand Down
36 changes: 10 additions & 26 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service,

ObjectManager::~ObjectManager() { StopIOService(); }

void ObjectManager::RegisterGcs() { object_directory_->RegisterBackend(); }

void ObjectManager::StartIOService() {
for (int i = 0; i < config_.max_sends; ++i) {
send_threads_.emplace_back(std::thread(&ObjectManager::RunSendService, this));
Expand Down Expand Up @@ -111,26 +113,12 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
RAY_LOG(ERROR) << object_id << " attempted to pull an object that's already local.";
return ray::Status::OK();
}
return PullGetLocations(object_id);
}

void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) {
pull_requests_[object_id] = std::make_shared<boost::asio::deadline_timer>(
*main_service_, boost::posix_time::milliseconds(wait_ms));
pull_requests_[object_id]->async_wait(
[this, object_id](const boost::system::error_code &error_code) {
pull_requests_.erase(object_id);
RAY_CHECK_OK(PullGetLocations(object_id));
});
}

ray::Status ObjectManager::PullGetLocations(const ObjectID &object_id) {
ray::Status status_code = object_directory_->GetLocations(
ray::Status status_code = object_directory_->SubscribeObjectLocations(
object_id,
[this](const std::vector<ClientID> &client_ids, const ObjectID &object_id) {
return GetLocationsSuccess(client_ids, object_id);
},
[this](const ObjectID &object_id) { return GetLocationsFailed(object_id); });
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(object_id));
GetLocationsSuccess(client_ids, object_id);
});
return status_code;
}

Expand All @@ -145,10 +133,6 @@ void ObjectManager::GetLocationsSuccess(const std::vector<ray::ClientID> &client
}
}

void ObjectManager::GetLocationsFailed(const ObjectID &object_id) {
SchedulePull(object_id, config_.pull_timeout_ms);
}

ray::Status ObjectManager::Pull(const ObjectID &object_id, const ClientID &client_id) {
// Check if object is already local.
if (local_objects_.count(object_id) != 0) {
Expand Down Expand Up @@ -188,7 +172,8 @@ ray::Status ObjectManager::PullEstablishConnection(const ObjectID &object_id,
RAY_CHECK_OK(pull_send_status);
},
[this, object_id](const Status &status) {
SchedulePull(object_id, config_.pull_timeout_ms);
RAY_LOG(ERROR) << "Failed to establish connection with remote object manager.";
RAY_CHECK_OK(status);
});
} else {
status = PullSendRequest(object_id, conn);
Expand Down Expand Up @@ -311,9 +296,8 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
}

ray::Status ObjectManager::Cancel(const ObjectID &object_id) {
// TODO(hme): Account for pull timers.
ray::Status status = object_directory_->Cancel(object_id);
return ray::Status::OK();
ray::Status status = object_directory_->UnsubscribeObjectLocations(object_id);
return status;
}

ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
Expand Down
22 changes: 3 additions & 19 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class ObjectManager : public ObjectManagerInterface {

~ObjectManager();

/// Register GCS-related functionality.
void RegisterGcs();

/// Subscribe to notifications of objects added to local store.
/// Upon subscribing, the callback will be invoked for all objects that
///
Expand Down Expand Up @@ -185,10 +188,6 @@ class ObjectManager : public ObjectManagerInterface {
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;

/// Timeout for failed pull requests.
std::unordered_map<ObjectID, std::shared_ptr<boost::asio::deadline_timer>>
pull_requests_;

/// Cache of locally available objects.
std::unordered_map<ObjectID, ObjectInfoT> local_objects_;

Expand All @@ -204,17 +203,6 @@ class ObjectManager : public ObjectManagerInterface {
/// Register object remove with directory.
void NotifyDirectoryObjectDeleted(const ObjectID &object_id);

/// Wait wait_ms milliseconds before triggering a pull request for object_id.
/// This is invoked when a pull fails. Only point of failure currently considered
/// is GetLocationsFailed.
void SchedulePull(const ObjectID &object_id, int wait_ms);

/// Part of an asynchronous sequence of Pull methods.
/// Gets the location of an object before invoking PullEstablishConnection.
/// Guaranteed to execute on main_service_ thread.
/// Executes on main_service_ thread.
ray::Status PullGetLocations(const ObjectID &object_id);

/// Part of an asynchronous sequence of Pull methods.
/// Uses an existing connection or creates a connection to ClientID.
/// Executes on main_service_ thread.
Expand All @@ -226,10 +214,6 @@ class ObjectManager : public ObjectManagerInterface {
void GetLocationsSuccess(const std::vector<ray::ClientID> &client_ids,
const ray::ObjectID &object_id);

/// Private callback implementation for failure on get location. Called from
/// ObjectDirectory.
void GetLocationsFailed(const ObjectID &object_id);

/// Synchronously send a pull request via remote object manager connection.
/// Executes on main_service_ thread.
ray::Status PullSendRequest(const ObjectID &object_id,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class MockServer {
client_info.node_manager_address = ip;
client_info.node_manager_port = object_manager_port;
client_info.object_manager_port = object_manager_port;
return gcs_client_->client_table().Connect(client_info);
ray::Status status = gcs_client_->client_table().Connect(client_info);
object_manager_.RegisterGcs();
return status;
}

void DoAcceptObjectManager() {
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class MockServer {
client_info.node_manager_address = ip;
client_info.node_manager_port = object_manager_port;
client_info.object_manager_port = object_manager_port;
return gcs_client_->client_table().Connect(client_info);
ray::Status status = gcs_client_->client_table().Connect(client_info);
object_manager_.RegisterGcs();
return status;
}

void DoAcceptObjectManager() {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
}

ray::Status NodeManager::RegisterGcs() {
object_manager_.RegisterGcs();

// Subscribe to task entry commits in the GCS. These notifications are
// forwarded to the lineage cache, which requests notifications about tasks
// that were executed remotely.
Expand Down

0 comments on commit f1da721

Please sign in to comment.