Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dashboard]Don't set node actors when node_id of actor is Nil #13573

Merged
merged 2 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dashboard/modules/stats_collector/stats_collector_consts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import ray

NODE_STATS_UPDATE_INTERVAL_SECONDS = 1
RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS = 1
ACTOR_CHANNEL = "ACTOR"
ERROR_INFO_UPDATE_INTERVAL_SECONDS = 5
LOG_INFO_UPDATE_INTERVAL_SECONDS = 5
NIL_NODE_ID = ray.NodeID.nil().hex()
15 changes: 9 additions & 6 deletions dashboard/modules/stats_collector/stats_collector_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ def _process_actor_table_data(data):
node_id = actor_table_data["address"]["rayletId"]
job_actors.setdefault(job_id,
{})[actor_id] = actor_table_data
node_actors.setdefault(node_id,
{})[actor_id] = actor_table_data
# Update only when node_id is not Nil.
if node_id != stats_collector_consts.NIL_NODE_ID:
node_actors.setdefault(
node_id, {})[actor_id] = actor_table_data
DataSource.job_actors.reset(job_actors)
DataSource.node_actors.reset(node_actors)
logger.info("Received %d actor info from GCS.",
Expand Down Expand Up @@ -232,10 +234,11 @@ def _process_actor_table_data(data):
node_id = actor_table_data["address"]["rayletId"]
# Update actors.
DataSource.actors[actor_id] = actor_table_data
# Update node actors.
node_actors = dict(DataSource.node_actors.get(node_id, {}))
node_actors[actor_id] = actor_table_data
DataSource.node_actors[node_id] = node_actors
# Update node actors (only when node_id is not Nil).
if node_id != stats_collector_consts.NIL_NODE_ID:
node_actors = dict(DataSource.node_actors.get(node_id, {}))
node_actors[actor_id] = actor_table_data
DataSource.node_actors[node_id] = node_actors
# Update job actors.
job_actors = dict(DataSource.job_actors.get(job_id, {}))
job_actors[actor_id] = actor_table_data
Expand Down
44 changes: 44 additions & 0 deletions dashboard/modules/stats_collector/tests/test_stats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import pytest
import ray
import threading
import ray.new_dashboard.modules.stats_collector.stats_collector_consts \
as stats_collector_consts
from datetime import datetime, timedelta
from ray.cluster_utils import Cluster
from ray.new_dashboard.tests.conftest import * # noqa
Expand Down Expand Up @@ -373,5 +375,47 @@ def check_errs():
check_errs, (AssertionError), timeout_ms=1000)


def test_nil_node(enable_test_module, disable_aiohttp_cache,
ray_start_with_dashboard):
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])
is True)
webui_url = ray_start_with_dashboard["webui_url"]
assert wait_until_server_available(webui_url)
webui_url = format_web_url(webui_url)

@ray.remote(num_gpus=1)
class InfeasibleActor:
pass

infeasible_actor = InfeasibleActor.remote() # noqa

timeout_seconds = 5
start_time = time.time()
last_ex = None
while True:
time.sleep(1)
try:
resp = requests.get(f"{webui_url}/logical/actors")
resp_json = resp.json()
resp_data = resp_json["data"]
actors = resp_data["actors"]
assert len(actors) == 1
response = requests.get(webui_url + "/test/dump?key=node_actors")
response.raise_for_status()
result = response.json()
assert stats_collector_consts.NIL_NODE_ID not in result["data"][
"nodeActors"]
break
except Exception as ex:
last_ex = ex
finally:
if time.time() > start_time + timeout_seconds:
ex_stack = traceback.format_exception(
type(last_ex), last_ex,
last_ex.__traceback__) if last_ex else []
ex_stack = "".join(ex_stack)
raise Exception(f"Timed out while testing, {ex_stack}")


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))