Skip to content

[xray] Use pubsub instead of timeout for ObjectManager Pull. #2079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 19, 2018

Conversation

elibol
Copy link
Contributor

@elibol elibol commented May 16, 2018

This PR removes the ObjectManager timeout for Pull. The ObjectDirectory interface method Lookup has been removed and replaced with methods Subscribe and Unsubscribe, which implement the existing GCS Lookup call followed by RequestNotifications whenever GCS Lookup fails to obtain object locations. This implementation eliminates the need to poll the GCS for object locations, which improves the performance of Pull.

ray::Status status = ray::Status::OK();
if (listeners_.find(object_id) != listeners_.end()) {
RAY_LOG(ERROR) << "Duplicate calls to SubscribeObjectLocations for " << object_id;
return ray::Status::Invalid("Cannot do things.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this message.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5439/
Test FAILed.

@elibol elibol changed the title Use pubsub instead of timeout for ObjectManager Pull. [xray] Use pubsub instead of timeout for ObjectManager Pull. May 17, 2018
@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5442/
Test PASSed.

void ObjectDirectory::RegisterBackend() {
auto object_notification_callback = [this](gcs::AsyncGcsClient *client,
const ObjectID &object_id,
const std::vector<ObjectTableDataT> data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const std::vector& data?

}
// Obtain reported client ids.
std::vector<ClientID> client_ids;
for (auto item : data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (auto& item :data)

const std::vector<ObjectTableDataT> &location_entries) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> locations;
for (auto entry : location_entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (auto& entry : location_entries) {

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, let's try to think carefully about the places where we return ray::Status and what it means if we return a status that is not OK. If there are methods where we should always return OK, I think it's better to just return void and add fatal checks for non-OK situations. If there are places where we should return a status, we should document in the method what it means if the status is not OK.

if (!item.is_eviction) {
ClientID client_id = ClientID::from_binary(item.manager);
client_ids.push_back(client_id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the object table is a log, you have to apply the eviction entries as well. Sorry there's no documentation for this, but for example, if an object was evicted on node A, then added again, then its log will look like: add A, evict A, add A. The easiest way to apply the entries is probably to build up an unordered_set of client IDs instead of a vector.

Actually, it would be great if you could add that documentation to gcs/tables.h!

const std::vector<ObjectTableDataT> &location_entries) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> locations;
for (auto &entry : location_entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the logic that you want for the handler for pubsub notifications. I would probably just separate this out as a helper function.

// Invoke the callback.
std::vector<ClientID> locations_vector(locations.begin(), locations.end());
if (locations_vector.empty()) {
fail_callback(object_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since GetLocations is now private, can we remove the success and fail callbacks from the signature and just call the callbacks that are registered in SubscribeObjectLocations directly?

struct LocationListenerState {
LocationListenerState(const OnLocationsFound &locations_found_callback)
: locations_found_callback(locations_found_callback), listening(false) {}
OnLocationsFound locations_found_callback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the struct members.

/// Invoked when call to ExecuteGetLocations completes.
void GetLocationsComplete(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_entries);
ray::Status GetLocations(const ObjectID &object_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc.

@@ -299,8 +285,7 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
}

ray::Status ObjectManager::Cancel(const ObjectID &object_id) {
// TODO(hme): Account for pull timers.
ray::Status status = object_directory_->Cancel(object_id);
ray::Status status = object_directory_->UnsubscribeObjectLocations(object_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This status is unused. Return it?

} else {
locations.erase(client_id);
}
ray::Status ObjectDirectory::UnsubscribeObjectLocations(const ObjectID &object_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful about having too many signatures return ray::Status. This one always seems to return OK. Can we either change the signature to void, or have it return the status from the CancelNotifications call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now returns status of CancelNotifications.

RAY_CHECK_OK(PullGetLocations(object_id));
});
}

ray::Status ObjectManager::PullGetLocations(const ObjectID &object_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PullGetLocations is now only called by the Pull method. Can we squash this method into Pull?

ray::Status status = ray::Status::OK();
if (listeners_.find(object_id) != listeners_.end()) {
RAY_LOG(ERROR) << "Duplicate calls to SubscribeObjectLocations for " << object_id;
return ray::Status::Invalid("Unable to subscribe to the same object twice.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning a non-OK error here will break the node manager, which always checks that a Pull returns OK and currently does not check if it has already called Pull on the object before. We should be clear on the contracts with the callers of SubscribeObjectLocations and Pull. For instance, I'm not sure that it makes sense to make the caller of SubscribeObjectLocations keep track of the calls it's made so far.

const std::vector<ObjectTableDataT> &data) {
GetLocationsComplete(object_id, data);
listeners_.emplace(object_id, LocationListenerState(callback));
GetLocations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetLocations returns a status. We should probably return that here.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5451/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5457/
Test PASSed.

std::vector<ClientID> client_id_vec(client_id_set.begin(), client_id_set.end());
entry->second.locations_found_callback(client_id_vec, object_id);
};
gcs_client_->object_table().Subscribe(UniqueID::nil(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unused return status here. Either return it or RAY_CHECK_OK.

}
}
std::vector<ClientID> client_id_vec(client_id_set.begin(), client_id_set.end());
entry->second.locations_found_callback(client_id_vec, object_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to call the callback if client_id_vec is empty?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5481/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5482/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5486/
Test PASSed.

@elibol
Copy link
Contributor Author

elibol commented May 19, 2018

@elibol elibol merged commit f1da721 into ray-project:master May 19, 2018
alok added a commit to alok/ray that referenced this pull request May 21, 2018
* master: (24 commits)
  Performance fix (ray-project#2110)
  Use flake8-comprehensions (ray-project#1976)
  Improve error message printing and suppression. (ray-project#2104)
  [rllib] [doc] Broken link in ddpg doc
  YAPF, take 3 (ray-project#2098)
  [rllib] rename async -> _async (ray-project#2097)
  fix unused lambda capture (ray-project#2102)
  [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079)
  [DataFrame] Update _inherit_docstrings (ray-project#2085)
  [JavaWorker] Changes to the build system for support java worker (ray-project#2092)
  [xray] Fix bug in updating actor execution dependencies (ray-project#2064)
  [DataFrame] Refactor __delitem__ (ray-project#2080)
  [xray] Better error messaging when pulling from self. (ray-project#2068)
  Use source code in hash where possible (fix ray-project#2089) (ray-project#2090)
  Functions for flushing done tasks and evicted objects. (ray-project#2033)
  Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086)
  [xray] Corrects Error Handling During Push and Pull. (ray-project#2059)
  [xray] Sophisticated task dependency management (ray-project#2035)
  Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081)
  [DataFrame] Improve performance of iteration methods (ray-project#2026)
  ...
alok added a commit to alok/ray that referenced this pull request May 24, 2018
* fix-a3c-torch: (37 commits)
  Add missing channel major
  Use correct filter size
  Add TODO
  Fix shape errors
  fmt
  Performance fix (ray-project#2110)
  Use flake8-comprehensions (ray-project#1976)
  Improve error message printing and suppression. (ray-project#2104)
  [rllib] [doc] Broken link in ddpg doc
  YAPF, take 3 (ray-project#2098)
  [rllib] rename async -> _async (ray-project#2097)
  fix unused lambda capture (ray-project#2102)
  [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079)
  [DataFrame] Update _inherit_docstrings (ray-project#2085)
  [JavaWorker] Changes to the build system for support java worker (ray-project#2092)
  [xray] Fix bug in updating actor execution dependencies (ray-project#2064)
  [DataFrame] Refactor __delitem__ (ray-project#2080)
  [xray] Better error messaging when pulling from self. (ray-project#2068)
  Use source code in hash where possible (fix ray-project#2089) (ray-project#2090)
  Functions for flushing done tasks and evicted objects. (ray-project#2033)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants