-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Define a Node class to manage Ray processes. #3733
Conversation
python/ray/node.py
Outdated
PROCESS_TYPE_RAYLET_MONITOR, check_alive=check_alive) | ||
|
||
def kill_all_processes(self, check_alive=True): | ||
# TODO(rkn): This is slower than necessary because it calls kill, wait, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we can do is to create a "killing_process_list" to store processes we have sent killing signals to, and we use another function to wait all these processes when wait=True
. This also allows us to track the status of these being killed processes by checking the "killing_process_list".
We should also get rid of |
Now I'm thinking of removing |
But I think that's too heavy for |
Ok, I'll keep them separate in this PR. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Jenkins, retest this please. |
Test PASSed. |
207ba3a
to
910c7da
Compare
Later on, we want to enable cross-language ray.call, e.g. enable Java to call Python. In this case, we need to pass a lot of parameters to raylet and workers. |
@guoyuhong if the command for starting a worker gets more complex, then we could pass in the "start worker command" as a separate argument to Note that after this PR, there is no If the raylet config gets really complex, we could make |
quick question before review - Is this going in 0.6.2? |
Test PASSed. |
Test PASSed. |
@richardliaw yes I hope so! |
Test FAILed. |
@pcmoritz @richardliaw @suquark I think this should pass the tests soon and should be ready to merge. Let me know if you have any more comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial Review (will continue)
|
||
# Logger for this module. It should be configured at the entry point | ||
# into the program using Ray. Ray configures it by default automatically | ||
# using logging.basicConfig in its entry/init points. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is undergoing some change; can you change this to mean something like, "see ray/worker.py:init()`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what it says in all of the other places. Why don't we just update them all at once when we make the change?
python/ray/node.py
Outdated
This class is responsible for starting Ray processes and killing them. | ||
|
||
Attributes: | ||
all_processes: A mapping from process type (which is a string) to a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all_processes: A mapping from process type (which is a string) to a | |
all_processes (dict): A mapping from process type (str) to a |
python/ray/node.py
Outdated
head (bool): True if this is the head node, which means it will | ||
start additional processes like the Redis servers, monitor | ||
processes, and web UI. | ||
shutdown_at_exit (bool): True if a handler should be registered to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown_at_exit (bool): True if a handler should be registered to | |
shutdown_at_exit (bool): If True, a handler will be registered to |
return self._raylet_socket_name | ||
|
||
def start_redis(self): | ||
"""Start the Redis servers.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, it actually shouldn't return anything
python/ray/node.py
Outdated
def start_log_monitor(self): | ||
"""Start the log monitor.""" | ||
stdout_file, stderr_file = new_log_monitor_log_file() | ||
p = ray.services.start_log_monitor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does lint not complain about 1 char variables? Can you rename p
to proc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
python/ray/node.py
Outdated
|
||
def start_worker(self): | ||
"""Start a worker process.""" | ||
raise NotImplementedError("This has not been implemented.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need for message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -72,8 +72,8 @@ def test_raylet_tempfiles(): | |||
assert log_files == { | |||
"log_monitor.out", "log_monitor.err", "plasma_store.out", | |||
"plasma_store.err", "webui.out", "webui.err", "monitor.out", | |||
"monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", | |||
"redis.err" | |||
"monitor.err", "raylet_monitor.out", "raylet_monitor.err", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is it possible to avoid these strings? i.e., make these variables or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in some other PR
@@ -191,3 +175,8 @@ def _check_usage(self): | |||
assert "GPU" not in self.resources, ( | |||
"'GPU' should not be included in the resource dictionary. Use " | |||
"num_gpus instead.") | |||
|
|||
if self.num_workers is not None: | |||
raise Exception( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeprecationWarning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not new, we've been raising an exception for a long time now since 0.5 or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see - this doesn't belong in this PR, but I think Ray can do a better job of raising more specific error messages than Exception
python/ray/test/cluster_utils.py
Outdated
import logging | ||
import time | ||
|
||
import redis | ||
|
||
import ray | ||
from ray.parameter import RayParams | ||
import ray.services as services | ||
import ray.node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, I removed it
@@ -1520,6 +1382,14 @@ def init(redis_address=None, | |||
raise DeprecationWarning("The use_raylet argument is deprecated. " | |||
"Please remove it.") | |||
|
|||
if driver_mode is not None: | |||
raise Exception("The 'driver_mode' argument has been deprecated. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeprecationWarning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not new.
Test PASSed. |
@@ -64,7 +62,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, | |||
plasma_store_executable = os.path.join( | |||
os.path.abspath(os.path.dirname(__file__)), | |||
"../core/src/plasma/plasma_store_server") | |||
plasma_store_name = socket_name or get_object_store_socket_name() | |||
plasma_store_name = socket_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this become a required arg then? This endpoint can be used outside Node() right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's required if you use this method now (which can be used outside of Node
)
return False | ||
return True | ||
if ray.worker._global_node is None: | ||
raise Exception("This process is not in a position to determine " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, if I got this error message, I wouldn't know what to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You would stop calling ray.services.remaining_processes_alive()
@@ -578,7 +489,7 @@ def start_redis(node_ip_address, | |||
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER", | |||
node_ip_address, port) | |||
|
|||
return redis_address, redis_shards | |||
return redis_address, redis_shards, processes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update docs for returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed all of these
record_log_files_in_redis( | ||
redis_address, | ||
node_ip_address, [stdout_file, stderr_file], | ||
password=redis_password) | ||
return p |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update doc for returns?
logger.info("\n" + "=" * 70) | ||
logger.info("View the web UI at {}".format(webui_url)) | ||
logger.info("=" * 70 + "\n") | ||
return webui_url | ||
return webui_url, ui_process | ||
return None, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update docs for returns?
record_log_files_in_redis(redis_address, node_ip_address, | ||
[stdout_file, stderr_file]) | ||
return p |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document returns?
record_log_files_in_redis( | ||
redis_address, | ||
node_ip_address, [stdout_file, stderr_file], | ||
password=redis_password) | ||
return p |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document returns?
@@ -37,7 +38,7 @@ | |||
from ray import import_thread | |||
from ray import profiling | |||
from ray.function_manager import (FunctionActorManager, FunctionDescriptor) | |||
from ray.parameter import RayParams | |||
import ray.parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without it I see
> ray_params = ray.parameter.RayParams(**node_args)
E AttributeError: module 'ray' has no attribute 'parameter'
@@ -299,8 +319,10 @@ def g(*xs): | |||
# execute. Do this in a loop while submitting tasks between each | |||
# component failure. | |||
time.sleep(0.1) | |||
components = ray.services.all_processes[component_type] | |||
for process in components[1:]: | |||
worker_nodes = cluster.list_all_nodes()[1:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not great because it isn't an invariant provided/explicit behavior by list_all_nodes
. It would be good to expose a specific method list_worker_nodes()
instead of assuming the first one is the head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to not really have a worker/head distinction, so I don't like the idea of list_worker_nodes()
. The relevant assumption here is that the driver is connected to list_all_nodes()[0]
output_info = ray.init( | ||
ignore_reinit_error=True, | ||
redis_address=self.redis_address, | ||
redis_password=self.redis_password) | ||
logger.info(output_info) | ||
self.connected = True | ||
|
||
def add_node(self, **override_kwargs): | ||
def add_node(self, **node_args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update docs?
Test PASSed. |
Test PASSed. |
Test FAILed. |
In this PR:
Node
class, which is responsible for starting and stopping Ray processes (e.g., the raylet, plasma store, monitor, redis shards, etc).ray.init()
creates a globalNode
object.ray start
also creates aNode
object.ray.services
are now managed by theNode
.Node
keeps track of other information, like whether a process was started in valgrind or not. This will make automated valgrind testing much easier (as well as profiling).TODO:
start_XXX
methods of theNode
class. They should just use the object's fields.