-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Object manager retries Pull requests #2630
[xray] Object manager retries Pull requests #2630
Conversation
- suppress duplicate Pulls - retry the Pull at the next client after a timeout - cancel a Pull if the object no longer appears on any clients
Test PASSed. |
cc @heyucongtom |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good (contingent on answers to some of my questions). The existing functionality is preserved while providing robustness to component failure, and suppression of multiple pull requests to the same object. Note that this does not imply suppression of multiple transfers of the same object, as this is still possible under certain circumstances.
Consider reverting the reorganization of the header files to make it easier for others to review. I had done this quite a while ago (just needs to be approved): #2251
My understanding of the changes follow.
A Pull
will obtain multiple locations from which to request an object. Once Pull
has been invoked for an object id, subsequent calls with the same object id are suppressed. The Pull
opens a subscription to object locations, which is maintained until one of two things occurs. (1) A notification from the object store is received indicating the object is local. The handler now invokes Cancel
on all objects that appear locally, which includes those for which a Pull
request is open. (2) The Pull
is cancelled by another entity.
Other points worth noting:
- The
SubscribeObjectLocations
handler forPull
appears to be correct. TryPull
will fail if it's invoked when no client ids are left in the client locations vector. This is currently difficult to reason about and could arguably be simplified by returning if client locations is empty. The correctness of this depends on whetherCancel
implicitly cancels active asio timers associated with the cancelled object.- Whenever
Cancel
is invoked, it does nothing if aPull
request is not active for the given object id, and properly terminates an active request when one exists. This appears to be the case, although the behavior in its current state is not easy to reason about since timers are not explicitly cancelled: It's unclear whether asio timers are cancelled when their unique pointers are no longer referenced.
} | ||
|
||
pull_requests_.emplace(object_id, PullRequest()); | ||
return object_directory_->SubscribeObjectLocations( | ||
object_directory_pull_callback_id_, object_id, | ||
[this](const std::vector<ClientID> &client_ids, const ObjectID &object_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment here indicating that, so long as the subscription is not cancelled, this method will be invoked whenever the vector of client ids on which the object is available changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup.
// The timer was set if there were more clients to try. We record this | ||
// now so that we can decide whether we should reset the timer when | ||
// trying the new client locations. | ||
bool timer_was_set = !it->second.client_locations.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using an explicit variable to indicate whether the timer has been set. This will make the code easier to read, and will also avoid conflating the semantics of the client locations size with whether the timer has been set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
|
||
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations( | ||
object_directory_pull_callback_id_, object_id)); | ||
pull_requests_.erase(it); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, we don't cancel running timers here because TryPull
properly handles the case of a timer firing for a cancelled pull request. If this is true, it might be worth noting here to avoid confusion about what is expected when pull requests are cancelled.
Some questions:
- The timer may have been instantiated but not running; in this case does calling cancel throw an exception?
- When the timer is running and no longer referenced from its unique pointer, does it cancel itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider this scenario:
Pull
is invoked on objectA
. Multiple object locations are discovered, so a timer is set after a request from the first object location is made.Cancel
is invoked onA
.Pull
is again invoked onA
.
If the timer does not cancel itself when its unique pointer is no longer referenced, then the timer will trigger TryPull
. This is a timer other than the timer created by the second pull request for A
. If it's invoked before a response from SubscribeObjectLocations
is received, then TryPull
will fail (client locations will be empty).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timer destructor will cancel any remaining handlers properly (see my comment on the PR thread).
// NOTE(swang): Since we are overwriting the previous list of clients, | ||
// we may end up sending a duplicate request to the same client as | ||
// before. | ||
it->second.client_locations = client_ids; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick fix to reduce the likelihood of consecutively requesting an object from the same client is to shuffle this vector when its length is greater than 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is unnecessary since the client IDs are built from an unordered_set
, so they may already be slightly shuffled. Also, we pop from the back of the list instead of the front in TryPull
.
ray::Status status = object_directory_->UnsubscribeObjectLocations( | ||
object_directory_pull_callback_id_, object_id); | ||
return status; | ||
void ObjectManager::Cancel(const ObjectID &object_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider changing the name of this method to CancelPull
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
it->second.retry_timer->cancel(); | ||
} | ||
} else { | ||
// New object locations were found. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a bool to indicate whether this is the first invocation of this handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate where the bool is set and what it should be used for?
if (timer_was_set) { | ||
// Cancel the timer if we were waiting to try the next client. | ||
RAY_CHECK(it->second.retry_timer != nullptr); | ||
it->second.retry_timer->cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If invoking cancel
when a timer is not running does nothing, then consider invoking cancel whenever the timer has been instantiated and the client locations list is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
Thanks @melih. Your understanding of the code seems right to me.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
What do these changes do?
This modifies the object manager to keep track of
Pull
requests and retry ones that have not succeeded within a certain timeout.For every
Pull
request, the object manager will subscribe to the object's locations with the GCS. On each notification from the GCS, the object manager will reset its list of known locations for the object.Once the object manager has a list of known locations, it will try sending a
Pull
request to each client successively. Once a client has been tried, a timer is set. If the object is received, or the request isCancel
ed, within the timeout, then the timer is canceled and thePull
request is cleaned up. Else, the next client in the list is tried, and so on.