-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Pin arguments during task execution #13737
[core] Pin arguments during task execution #13737
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Is the reason why we don't pin an object as soon as it is pulled because that's more complex or less robust?
} | ||
for (size_t i = 0; i < deps.size(); i++) { | ||
if (args[i] == nullptr) { | ||
RAY_LOG(INFO) << "Task " << spec.TaskId() << " argument " << deps[i] << " was evicted before task could be dispatched"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment when this can happen? It happens only when some of objects are evicted before the whole bundle is pulled?
bool success = true; | ||
const auto &deps = spec.GetDependencyIds(); | ||
if (!deps.empty()) { | ||
success = pin_task_arguments_(deps, &args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment args will have a reference to a plasma store?
@@ -252,8 +271,9 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { | |||
}; | |||
|
|||
/* Blocked on dependencies */ | |||
dependency_manager_.task_ready_ = false; | |||
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); | |||
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this line of code, but maybe we can add a simple test that all args are pinned correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM; simpler than I thought.
/// Arguments needed by currently granted lease requests. These should be | ||
/// pinned before the lease is granted to ensure that the arguments are not | ||
/// evicted before the task(s) start running. | ||
std::unordered_map<TaskID, std::vector<std::unique_ptr<RayObject>>> pinned_task_arguments_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add this to DebugString()?
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); | ||
task_manager_.ScheduleAndDispatchTasks(); | ||
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); | ||
ASSERT_EQ(num_callbacks, 0); | ||
ASSERT_EQ(leased_workers_.size(), 0); | ||
|
||
/* Worker available and arguments available */ | ||
dependency_manager_.task_ready_ = true; | ||
missing_objects_.erase(missing_arg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test that the pinned object is unpinned when the lease is returned?
@@ -2346,33 +2375,14 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, | |||
object_ids.push_back(ObjectID::FromBinary(object_id_binary)); | |||
} | |||
if (object_pinning_enabled_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this feature flag (separately)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's just used for LRU right now (and I guess if you wanted to turn off ref counting manually, but I don't think that should be recommended or exposed anymore...).
* tmp * Pin task args * unit tests * update * test * Fix
This reverts commit da4f463.
Why are these changes needed?
Currently, there is a race condition where a task's arguments can get evicted after the task is dispatched but before the worker has gotten the arguments from the object store. This can lead to a deadlock if there are too many requests to pull other objects in the node's queue.
This fixes the race condition by having the raylet pin the dependencies while a task lease is granted.
Related issue number
Closes #12663.
Checks
scripts/format.sh
to lint the changes in this PR.