Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define a Node class to manage Ray processes. #3733

Merged
merged 7 commits into from
Jan 12, 2019

Conversation

robertnishihara
Copy link
Collaborator

@robertnishihara robertnishihara commented Jan 10, 2019

In this PR:

  • There is a Node class, which is responsible for starting and stopping Ray processes (e.g., the raylet, plasma store, monitor, redis shards, etc).
  • ray.init() creates a global Node object. ray start also creates a Node object.
  • The list of started processes that were managed before as global variables in ray.services are now managed by the Node.
  • The Node keeps track of other information, like whether a process was started in valgrind or not. This will make automated valgrind testing much easier (as well as profiling).

TODO:

  • Documentation
  • Make sure tests pass.
  • This PR may have made shutdown more verbose. Fix that.
  • Remove all or most of the arguments from the start_XXX methods of the Node class. They should just use the object's fields.

PROCESS_TYPE_RAYLET_MONITOR, check_alive=check_alive)

def kill_all_processes(self, check_alive=True):
# TODO(rkn): This is slower than necessary because it calls kill, wait,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we can do is to create a "killing_process_list" to store processes we have sent killing signals to, and we use another function to wait all these processes when wait=True. This also allows us to track the status of these being killed processes by checking the "killing_process_list".

@suquark
Copy link
Member

suquark commented Jan 10, 2019

We should also get rid of ray_params in services.py (e.g. start_raylet) so that services.py only provides lowest abstraction for starting processes.

@robertnishihara
Copy link
Collaborator Author

Now I'm thinking of removing services.py altogether and only allow starting processes via the Node class.

@suquark
Copy link
Member

suquark commented Jan 10, 2019

But I think that's too heavy for Node class. In #3703, I separate process starting into two different modules: one only provides basic abstraction for starting processes, its function is providing python interfaces for starting processes; another is in RayNodeSession (almost your Node class) uses ray_params and the context of the Node (existing redis_address, etc) to start processes, and it calls the functions in the first module.

@robertnishihara
Copy link
Collaborator Author

Ok, I'll keep them separate in this PR.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10717/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10721/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10720/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10732/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10731/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10733/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10734/
Test PASSed.

@robertnishihara robertnishihara changed the title [WIP] Define a Node class to manage Ray processes. Define a Node class to manage Ray processes. Jan 10, 2019
@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10751/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10753/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10754/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10758/
Test FAILed.

@robertnishihara
Copy link
Collaborator Author

Jenkins, retest this please.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10759/
Test PASSed.

@guoyuhong
Copy link
Contributor

Later on, we want to enable cross-language ray.call, e.g. enable Java to call Python. In this case, we need to pass a lot of parameters to raylet and workers.
Imagine that we want to pass a parameter to python default worker. we need to add a parameter to scripts.py. Then the parameter will go through function chain to pass to raylet: start_ray_head->start_ray_processes->start_raylet. If this parameter is needed by the worker, we need to format the worker command as raylet argument and in default worker we need to parse this argument and pass to worker.connect. That is a very long path to go. RayParams is used to pass this kind of parameters. Therefore, I prefer keep RayParams in start_raylet. Otherwise, the argument list will become longer and longer.

@robertnishihara
Copy link
Collaborator Author

robertnishihara commented Jan 11, 2019

@guoyuhong if the command for starting a worker gets more complex, then we could pass in the "start worker command" as a separate argument to start_raylet, right?

Note that after this PR, there is no start_ray_head anymore.

If the raylet config gets really complex, we could make raylet_params a field of ray_params.

@richardliaw
Copy link
Contributor

quick question before review - Is this going in 0.6.2?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10764/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10766/
Test PASSed.

@robertnishihara
Copy link
Collaborator Author

@richardliaw yes I hope so!

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10776/
Test FAILed.

@robertnishihara
Copy link
Collaborator Author

@pcmoritz @richardliaw @suquark I think this should pass the tests soon and should be ready to merge. Let me know if you have any more comments.

Copy link
Contributor

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial Review (will continue)


# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is undergoing some change; can you change this to mean something like, "see ray/worker.py:init()`

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what it says in all of the other places. Why don't we just update them all at once when we make the change?

This class is responsible for starting Ray processes and killing them.

Attributes:
all_processes: A mapping from process type (which is a string) to a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
all_processes: A mapping from process type (which is a string) to a
all_processes (dict): A mapping from process type (str) to a

head (bool): True if this is the head node, which means it will
start additional processes like the Redis servers, monitor
processes, and web UI.
shutdown_at_exit (bool): True if a handler should be registered to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
shutdown_at_exit (bool): True if a handler should be registered to
shutdown_at_exit (bool): If True, a handler will be registered to

return self._raylet_socket_name

def start_redis(self):
"""Start the Redis servers."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, it actually shouldn't return anything

def start_log_monitor(self):
"""Start the log monitor."""
stdout_file, stderr_file = new_log_monitor_log_file()
p = ray.services.start_log_monitor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does lint not complain about 1 char variables? Can you rename p to proc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


def start_worker(self):
"""Start a worker process."""
raise NotImplementedError("This has not been implemented.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need for message

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -72,8 +72,8 @@ def test_raylet_tempfiles():
assert log_files == {
"log_monitor.out", "log_monitor.err", "plasma_store.out",
"plasma_store.err", "webui.out", "webui.err", "monitor.out",
"monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out",
"redis.err"
"monitor.err", "raylet_monitor.out", "raylet_monitor.err",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it possible to avoid these strings? i.e., make these variables or something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in some other PR

@@ -191,3 +175,8 @@ def _check_usage(self):
assert "GPU" not in self.resources, (
"'GPU' should not be included in the resource dictionary. Use "
"num_gpus instead.")

if self.num_workers is not None:
raise Exception(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeprecationWarning?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not new, we've been raising an exception for a long time now since 0.5 or something like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see - this doesn't belong in this PR, but I think Ray can do a better job of raising more specific error messages than Exception

import logging
import time

import redis

import ray
from ray.parameter import RayParams
import ray.services as services
import ray.node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, I removed it

@@ -1520,6 +1382,14 @@ def init(redis_address=None,
raise DeprecationWarning("The use_raylet argument is deprecated. "
"Please remove it.")

if driver_mode is not None:
raise Exception("The 'driver_mode' argument has been deprecated. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeprecationWarning?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not new.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10778/
Test PASSed.

@@ -64,7 +62,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
plasma_store_executable = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"../core/src/plasma/plasma_store_server")
plasma_store_name = socket_name or get_object_store_socket_name()
plasma_store_name = socket_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this become a required arg then? This endpoint can be used outside Node() right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's required if you use this method now (which can be used outside of Node)

return False
return True
if ray.worker._global_node is None:
raise Exception("This process is not in a position to determine "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, if I got this error message, I wouldn't know what to do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would stop calling ray.services.remaining_processes_alive()

@@ -578,7 +489,7 @@ def start_redis(node_ip_address,
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, port)

return redis_address, redis_shards
return redis_address, redis_shards, processes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update docs for returns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed all of these

record_log_files_in_redis(
redis_address,
node_ip_address, [stdout_file, stderr_file],
password=redis_password)
return p
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update doc for returns?

logger.info("\n" + "=" * 70)
logger.info("View the web UI at {}".format(webui_url))
logger.info("=" * 70 + "\n")
return webui_url
return webui_url, ui_process
return None, None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update docs for returns?

record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
return p
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document returns?

record_log_files_in_redis(
redis_address,
node_ip_address, [stdout_file, stderr_file],
password=redis_password)
return p
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document returns?

@@ -37,7 +38,7 @@
from ray import import_thread
from ray import profiling
from ray.function_manager import (FunctionActorManager, FunctionDescriptor)
from ray.parameter import RayParams
import ray.parameter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this import?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without it I see

>       ray_params = ray.parameter.RayParams(**node_args)
E       AttributeError: module 'ray' has no attribute 'parameter'

@@ -299,8 +319,10 @@ def g(*xs):
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
components = ray.services.all_processes[component_type]
for process in components[1:]:
worker_nodes = cluster.list_all_nodes()[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not great because it isn't an invariant provided/explicit behavior by list_all_nodes. It would be good to expose a specific method list_worker_nodes() instead of assuming the first one is the head.

Copy link
Collaborator Author

@robertnishihara robertnishihara Jan 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to not really have a worker/head distinction, so I don't like the idea of list_worker_nodes(). The relevant assumption here is that the driver is connected to list_all_nodes()[0]

output_info = ray.init(
ignore_reinit_error=True,
redis_address=self.redis_address,
redis_password=self.redis_password)
logger.info(output_info)
self.connected = True

def add_node(self, **override_kwargs):
def add_node(self, **node_args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docs?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10780/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10783/
Test PASSed.

@pcmoritz pcmoritz merged commit 8723d6b into ray-project:master Jan 12, 2019
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10781/
Test FAILed.

@robertnishihara robertnishihara deleted the clusterservices branch January 12, 2019 06:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants