-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Raylet task dispatch and throttling worker startup #1912
Changes from all commits
d27ccdf
c503917
d1af21a
84638b9
21c559f
fce161f
188a60b
0329905
4236a8f
6bc95af
ae0d653
2562a11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,15 @@ WorkerPool::WorkerPool(int num_workers, const std::vector<std::string> &worker_c | |
// become zombies instead of dying gracefully. | ||
signal(SIGCHLD, SIG_IGN); | ||
for (int i = 0; i < num_workers; i++) { | ||
StartWorker(); | ||
// Force-start num_workers workers. | ||
StartWorker(true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I learned this tip from Zongheng to write this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't this introduce C-style comments? I'd rather not, if there's a cleaner, pythonic way, that'd be great. |
||
} | ||
} | ||
|
||
/// A constructor that initializes an empty worker pool with zero workers. | ||
WorkerPool::WorkerPool(const std::vector<std::string> &worker_command) | ||
: worker_command_(worker_command) {} | ||
|
||
WorkerPool::~WorkerPool() { | ||
// Kill all registered workers. NOTE(swang): This assumes that the registered | ||
// workers were started by the pool. | ||
|
@@ -28,15 +33,39 @@ WorkerPool::~WorkerPool() { | |
kill(worker->Pid(), SIGKILL); | ||
waitpid(worker->Pid(), NULL, 0); | ||
} | ||
// Kill all the workers that have been started but not registered. | ||
for (const auto &pid : started_worker_pids_) { | ||
RAY_CHECK(pid > 0); | ||
kill(pid, SIGKILL); | ||
waitpid(pid, NULL, 0); | ||
} | ||
|
||
pool_.clear(); | ||
actor_pool_.clear(); | ||
registered_workers_.clear(); | ||
started_worker_pids_.clear(); | ||
} | ||
|
||
uint32_t WorkerPool::Size() const { | ||
return static_cast<uint32_t>(actor_pool_.size() + pool_.size()); | ||
} | ||
|
||
void WorkerPool::StartWorker() { | ||
void WorkerPool::StartWorker(bool force_start) { | ||
RAY_CHECK(!worker_command_.empty()) << "No worker command provided"; | ||
if (!started_worker_pids_.empty() && !force_start) { | ||
// Workers have been started, but not registered. Force start disabled -- returning. | ||
RAY_LOG(DEBUG) << started_worker_pids_.size() << " workers pending registration"; | ||
return; | ||
} | ||
// Either there are no workers pending registration or the worker start is being forced. | ||
RAY_LOG(DEBUG) << "starting worker, actor pool " << actor_pool_.size() << " task pool " | ||
<< pool_.size(); | ||
|
||
// Launch the process to create the worker. | ||
pid_t pid = fork(); | ||
if (pid != 0) { | ||
RAY_LOG(DEBUG) << "Started worker with pid " << pid; | ||
started_worker_pids_.insert(pid); | ||
return; | ||
} | ||
|
||
|
@@ -60,6 +89,8 @@ void WorkerPool::StartWorker() { | |
void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) { | ||
RAY_LOG(DEBUG) << "Registering worker with pid " << worker->Pid(); | ||
registered_workers_.push_back(worker); | ||
RAY_CHECK(started_worker_pids_.count(worker->Pid()) > 0); | ||
started_worker_pids_.erase(worker->Pid()); | ||
} | ||
|
||
std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker( | ||
|
@@ -119,6 +150,11 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr<Worker> worker) { | |
return removeWorker(pool_, worker); | ||
} | ||
|
||
// Protected WorkerPool methods. | ||
void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); } | ||
|
||
uint32_t WorkerPool::NumStartedWorkers() const { return started_worker_pids_.size(); } | ||
|
||
} // namespace raylet | ||
|
||
} // namespace ray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks :)