-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Start actor workers in parallel #2168
[xray] Start actor workers in parallel #2168
Conversation
src/ray/raylet/node_manager.cc
Outdated
@@ -622,6 +622,10 @@ void NodeManager::AssignTask(Task &task) { | |||
// There are no more non-actor workers available to execute this task. | |||
// Start a new worker. | |||
worker_pool_.StartWorker(); | |||
} else if (spec.IsActorCreationTask()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't actor creation tasks already go through the if
statement, so they won't hit the else if
?
src/ray/raylet/node_manager.cc
Outdated
} else if (spec.IsActorCreationTask()) { | ||
// We will need a new worker for this actor, so force start one. | ||
// This parallelizes starting lots of actor workers. | ||
worker_pool_.StartWorker(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this parallelize starting actor workers? It seems to be serial.
Test PASSed. |
Test FAILed. |
7c87761
to
75aa7c0
Compare
Test PASSed. |
With the following program: import ray
import time
ray.init(use_raylet=True)
@ray.remote
class Actor(object):
def __init__(self):
pass
def f(self):
pass
%time As = [Actor.remote() for i in range(64)]; ray.get([A.f.remote() for A in As]) On a m4.4xlarge (8 physical cores) we get these timings:
|
Test PASSed. |
Test PASSed. |
src/ray/raylet/worker_pool.cc
Outdated
if (!started_worker_pids_.empty() && !force_start) { | ||
// The first condition makes sure that we are always starting up to | ||
// std::thread::hardware_concurrency() number of processes in parallel. | ||
if (NumStartedWorkers() > std::thread::hardware_concurrency() && !force_start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of std::thread::hardware_concurrency()
, let's use num_cpus
Test FAILed. |
Test FAILed. |
Test PASSed. |
retest this please |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Left a few small comments.
src/ray/raylet/worker_pool.cc
Outdated
@@ -155,7 +158,7 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr<Worker> worker) { | |||
// Protected WorkerPool methods. | |||
void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); } | |||
|
|||
uint32_t WorkerPool::NumStartedWorkers() const { return started_worker_pids_.size(); } | |||
int WorkerPool::NumStartedWorkers() const { return started_worker_pids_.size(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this to an int
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://google.github.io/styleguide/cppguide.html, "On Unsigned Integers"
In this case it was necessary to avoid a signed/unsigned mismatch.
In general it's better to use signed ints, unless there is a good reason not to (i.e. for doing bit manipulations or modular arithmetic).
src/ray/raylet/worker_pool.cc
Outdated
if (!started_worker_pids_.empty() && !force_start) { | ||
// The first condition makes sure that we are always starting up to | ||
// num_cpus_ number of processes in parallel. | ||
if (NumStartedWorkers() > num_cpus_ && !force_start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the documentation for StartWorker
to account for the new logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the documentation should also make clear that StartWorker
may not actually start a worker.
@@ -116,6 +116,12 @@ bool ResourceSet::GetResource(const std::string &resource_name, double *value) c | |||
return true; | |||
} | |||
|
|||
double ResourceSet::GetNumCPUs() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple other places in the code where we have to do this logic (I think in node_manager.cc
, maybe elsewhere too). Can you find and replace that code with the GetNumCPUs
method?
/// Return the number of CPUs. | ||
/// | ||
/// \return Number of CPUs. | ||
double GetNumCPUs() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the convention we've been using is to lower-case acronyms, so it should be GetNumCpus
.
src/ray/raylet/node_manager.cc
Outdated
@@ -70,7 +70,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, | |||
heartbeat_timer_(io_service), | |||
heartbeat_period_ms_(config.heartbeat_period_ms), | |||
local_resources_(config.resource_config), | |||
worker_pool_(config.num_initial_workers, config.worker_command), | |||
worker_pool_(config.num_initial_workers, | |||
static_cast<int>(config.resource_config.GetNumCPUs()) + 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why +1
?
Test FAILed. |
Test FAILed. |
Jenkins retest this please |
Test FAILed. |
Jenkins retest this please |
Test PASSed. |
What do these changes do?
Related issue number