-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[xray] Use pubsub instead of timeout for ObjectManager Pull. #2079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ray::Status status = ray::Status::OK(); | ||
if (listeners_.find(object_id) != listeners_.end()) { | ||
RAY_LOG(ERROR) << "Duplicate calls to SubscribeObjectLocations for " << object_id; | ||
return ray::Status::Invalid("Cannot do things."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this message.
Test FAILed. |
Test PASSed. |
void ObjectDirectory::RegisterBackend() { | ||
auto object_notification_callback = [this](gcs::AsyncGcsClient *client, | ||
const ObjectID &object_id, | ||
const std::vector<ObjectTableDataT> data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::vector& data?
} | ||
// Obtain reported client ids. | ||
std::vector<ClientID> client_ids; | ||
for (auto item : data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (auto& item :data)
const std::vector<ObjectTableDataT> &location_entries) { | ||
// Build the set of current locations based on the entries in the log. | ||
std::unordered_set<ClientID> locations; | ||
for (auto entry : location_entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (auto& entry : location_entries) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, let's try to think carefully about the places where we return ray::Status
and what it means if we return a status that is not OK. If there are methods where we should always return OK, I think it's better to just return void and add fatal checks for non-OK situations. If there are places where we should return a status, we should document in the method what it means if the status is not OK.
if (!item.is_eviction) { | ||
ClientID client_id = ClientID::from_binary(item.manager); | ||
client_ids.push_back(client_id); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the object table is a log, you have to apply the eviction entries as well. Sorry there's no documentation for this, but for example, if an object was evicted on node A, then added again, then its log will look like: add A, evict A, add A. The easiest way to apply the entries is probably to build up an unordered_set
of client IDs instead of a vector.
Actually, it would be great if you could add that documentation to gcs/tables.h
!
const std::vector<ObjectTableDataT> &location_entries) { | ||
// Build the set of current locations based on the entries in the log. | ||
std::unordered_set<ClientID> locations; | ||
for (auto &entry : location_entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the logic that you want for the handler for pubsub notifications. I would probably just separate this out as a helper function.
// Invoke the callback. | ||
std::vector<ClientID> locations_vector(locations.begin(), locations.end()); | ||
if (locations_vector.empty()) { | ||
fail_callback(object_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since GetLocations
is now private, can we remove the success and fail callbacks from the signature and just call the callbacks that are registered in SubscribeObjectLocations
directly?
struct LocationListenerState { | ||
LocationListenerState(const OnLocationsFound &locations_found_callback) | ||
: locations_found_callback(locations_found_callback), listening(false) {} | ||
OnLocationsFound locations_found_callback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the struct members.
/// Invoked when call to ExecuteGetLocations completes. | ||
void GetLocationsComplete(const ObjectID &object_id, | ||
const std::vector<ObjectTableDataT> &location_entries); | ||
ray::Status GetLocations(const ObjectID &object_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc.
@@ -299,8 +285,7 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id, | |||
} | |||
|
|||
ray::Status ObjectManager::Cancel(const ObjectID &object_id) { | |||
// TODO(hme): Account for pull timers. | |||
ray::Status status = object_directory_->Cancel(object_id); | |||
ray::Status status = object_directory_->UnsubscribeObjectLocations(object_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This status is unused. Return it?
} else { | ||
locations.erase(client_id); | ||
} | ||
ray::Status ObjectDirectory::UnsubscribeObjectLocations(const ObjectID &object_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be careful about having too many signatures return ray::Status
. This one always seems to return OK
. Can we either change the signature to void
, or have it return the status from the CancelNotifications
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now returns status of CancelNotifications
.
RAY_CHECK_OK(PullGetLocations(object_id)); | ||
}); | ||
} | ||
|
||
ray::Status ObjectManager::PullGetLocations(const ObjectID &object_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PullGetLocations
is now only called by the Pull
method. Can we squash this method into Pull
?
ray::Status status = ray::Status::OK(); | ||
if (listeners_.find(object_id) != listeners_.end()) { | ||
RAY_LOG(ERROR) << "Duplicate calls to SubscribeObjectLocations for " << object_id; | ||
return ray::Status::Invalid("Unable to subscribe to the same object twice."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning a non-OK error here will break the node manager, which always checks that a Pull
returns OK and currently does not check if it has already called Pull
on the object before. We should be clear on the contracts with the callers of SubscribeObjectLocations
and Pull
. For instance, I'm not sure that it makes sense to make the caller of SubscribeObjectLocations
keep track of the calls it's made so far.
const std::vector<ObjectTableDataT> &data) { | ||
GetLocationsComplete(object_id, data); | ||
listeners_.emplace(object_id, LocationListenerState(callback)); | ||
GetLocations( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetLocations
returns a status. We should probably return that here.
Test PASSed. |
Test PASSed. |
std::vector<ClientID> client_id_vec(client_id_set.begin(), client_id_set.end()); | ||
entry->second.locations_found_callback(client_id_vec, object_id); | ||
}; | ||
gcs_client_->object_table().Subscribe(UniqueID::nil(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an unused return status here. Either return it or RAY_CHECK_OK
.
} | ||
} | ||
std::vector<ClientID> client_id_vec(client_id_set.begin(), client_id_set.end()); | ||
entry->second.locations_found_callback(client_id_vec, object_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to call the callback if client_id_vec
is empty?
Test PASSed. |
Test PASSed. |
Test PASSed. |
* master: (24 commits) Performance fix (ray-project#2110) Use flake8-comprehensions (ray-project#1976) Improve error message printing and suppression. (ray-project#2104) [rllib] [doc] Broken link in ddpg doc YAPF, take 3 (ray-project#2098) [rllib] rename async -> _async (ray-project#2097) fix unused lambda capture (ray-project#2102) [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079) [DataFrame] Update _inherit_docstrings (ray-project#2085) [JavaWorker] Changes to the build system for support java worker (ray-project#2092) [xray] Fix bug in updating actor execution dependencies (ray-project#2064) [DataFrame] Refactor __delitem__ (ray-project#2080) [xray] Better error messaging when pulling from self. (ray-project#2068) Use source code in hash where possible (fix ray-project#2089) (ray-project#2090) Functions for flushing done tasks and evicted objects. (ray-project#2033) Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086) [xray] Corrects Error Handling During Push and Pull. (ray-project#2059) [xray] Sophisticated task dependency management (ray-project#2035) Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081) [DataFrame] Improve performance of iteration methods (ray-project#2026) ...
* fix-a3c-torch: (37 commits) Add missing channel major Use correct filter size Add TODO Fix shape errors fmt Performance fix (ray-project#2110) Use flake8-comprehensions (ray-project#1976) Improve error message printing and suppression. (ray-project#2104) [rllib] [doc] Broken link in ddpg doc YAPF, take 3 (ray-project#2098) [rllib] rename async -> _async (ray-project#2097) fix unused lambda capture (ray-project#2102) [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079) [DataFrame] Update _inherit_docstrings (ray-project#2085) [JavaWorker] Changes to the build system for support java worker (ray-project#2092) [xray] Fix bug in updating actor execution dependencies (ray-project#2064) [DataFrame] Refactor __delitem__ (ray-project#2080) [xray] Better error messaging when pulling from self. (ray-project#2068) Use source code in hash where possible (fix ray-project#2089) (ray-project#2090) Functions for flushing done tasks and evicted objects. (ray-project#2033) ...
This PR removes the ObjectManager timeout for
Pull
. The ObjectDirectory interface methodLookup
has been removed and replaced with methodsSubscribe
andUnsubscribe
, which implement the existing GCSLookup
call followed byRequestNotifications
whenever GCSLookup
fails to obtain object locations. This implementation eliminates the need to poll the GCS for object locations, which improves the performance ofPull
.