-
Couldn't load subscription status.
- Fork 6.8k
Cluster Utilities for Fault Tolerance Tests #3008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster Utilities for Fault Tolerance Tests #3008
Conversation
|
Test PASSed. |
|
Related to #609. |
python/ray/test/cluster_utils.py
Outdated
| # Log monitor doesn't die for some reason | ||
| worker.kill_log_monitor() | ||
| worker.kill_plasma_store() | ||
| # TODO(rliaw): how to wait for raylet timeout? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robertnishihara any idea for this?
|
Test FAILed. |
|
Test PASSed. |
|
Test FAILed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
python/ray/test/cluster_utils.py
Outdated
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Cluster(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cluster(object) to support Python 2
python/ray/test/cluster_utils.py
Outdated
| def __init__(self, initialize_head=False, connect=False, **head_node_args): | ||
| """Initializes the cluster. | ||
|
|
||
| Arguments: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Args:
python/ray/test/cluster_utils.py
Outdated
| """ | ||
| self.head_node = None | ||
| self.worker_nodes = {} | ||
| self.redis_address = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think None would be better than ""
python/ray/test/cluster_utils.py
Outdated
| self.add_node(**head_node_args) | ||
| if connect: | ||
| ray.init(redis_address=self.redis_address) | ||
| elif connect: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just raise an exception earlier on if not initialize_head and connect
python/ray/test/cluster_utils.py
Outdated
| self.process_dict["log_monitor"][0].kill() | ||
| self.process_dict["log_monitor"][0].wait() | ||
|
|
||
| def kill_allprocesses(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill_all_processes
python/ray/test/cluster_utils.py
Outdated
| dies before worker nodes die? | ||
|
|
||
| Returns: | ||
| List of all nodes, including the head node.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" on the next line
python/ray/test/cluster_utils.py
Outdated
|
|
||
| def wait_for_nodes(self): | ||
| """Waits for all nodes to be registered with global state.""" | ||
| try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point of this try/except?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to avoid a bad error message when this is called before ray.init, you can check ray.is_connected
python/ray/test/cluster_utils.py
Outdated
| self.remove_node(self.head_node) | ||
|
|
||
|
|
||
| class Node(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Node(object)
test/multi_node_test_2.py
Outdated
| worker.kill_plasma_store() | ||
| worker.process_dict[services.PROCESS_TYPE_RAYLET][0].wait() | ||
| assert not worker.any_processes_alive(), worker.live_processes() | ||
| print("Success") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line
test/multi_node_test_2.py
Outdated
| worker2 = g.add_node() | ||
| g.wait_for_nodes() | ||
| g.remove_node(worker2) | ||
| g.wait_for_nodes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so this is actually waiting for the node to be removed from the client table, right? I guess that's a good thing to test, but unfortunately it takes 10s for that to happen, per call, so I'm guessing these tests are really slow? This might be too much of a burden on our CI...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this test takes about 15 seconds to run... is there someway we can test this (maybe reduce the 10s wait?)
| """Abstraction for a Ray node.""" | ||
|
|
||
| def __init__(self, process_dict): | ||
| # TODO(rliaw): Is there a unique identifier for a node? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each node has a client_id in the client table. What kind of identifier are you looking for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah; something like that (this is a non-blocking issue though). What would be a way to get the client_id without first calling ray.init? Ideally, it would be part of the output of services.start_ray_node.
test/multi_node_test_2.py
Outdated
| g.remove_node(worker2) | ||
| g.wait_for_nodes() | ||
| ray.shutdown() | ||
| g.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ray.shutdown and g.shutdown are typically the kind of thing you would put in a pytest fixture (after yield) so that they always get called in the event of a test failure. Not sure if it makes sense here, but something to consider..
test/multi_node_test_2.py
Outdated
| worker.process_dict[services.PROCESS_TYPE_RAYLET][0].wait() | ||
| assert not worker.any_processes_alive(), worker.live_processes() | ||
| print("Success") | ||
| g.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need a ray.shutdown in this test also, right?
|
Test FAILed. |
|
@richardliaw it looks like you're running the test for legacy Ray as well as xray. You probably want to remove it from the legacy Ray test suite. |
|
Done. |
|
Test PASSed. |
|
Test PASSed. |
|
@robertnishihara tests passing |
|
Thanks @richardliaw! |
Adds some utilities for creating and killing nodes in a cluster. Useful for fault tolerance tests.
TODOs:
component_failure_test.py