Skip to content

Commit

Permalink
[Placement group] Do not report ready task demand (#18463)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkooo567 authored Sep 9, 2021
1 parent 0f56287 commit fdd5210
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
5 changes: 5 additions & 0 deletions python/ray/autoscaler/_private/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ def filter_placement_group_from_bundle(bundle: ResourceBundle):
(pg_filtered_bundle,
using_placement_group) = filter_placement_group_from_bundle(bundle)

# bundle is a special keyword for placement group ready tasks
# do not report the demand for this.
if "bundle" in pg_filtered_bundle.keys():
continue

bundle_demand[tuple(sorted(pg_filtered_bundle.items()))] += count
if using_placement_group:
pg_bundle_demand[tuple(sorted(
Expand Down
56 changes: 54 additions & 2 deletions python/ray/tests/test_placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@
pytest_timeout = None

import ray
import ray.cluster_utils
import ray._private.gcs_utils as gcs_utils

from ray.autoscaler._private.commands import debug_status
from ray._private.test_utils import (
generate_system_config_map, get_other_nodes,
kill_actor_and_wait_for_failure, run_string_as_driver, wait_for_condition,
get_error_message)
import ray.cluster_utils
from ray.exceptions import RaySystemError
from ray._raylet import PlacementGroupID
import ray._private.gcs_utils as gcs_utils
from ray.util.placement_group import (PlacementGroup, placement_group,
remove_placement_group,
get_current_placement_group)
from ray.util.client.ray_client_helpers import connect_to_client_or_not
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \
DEBUG_AUTOSCALING_STATUS


@ray.remote
Expand All @@ -29,6 +33,25 @@ def method(self, x):
return x + 2


def is_placement_group_removed(pg):
table = ray.util.placement_group_table(pg)
if "state" not in table:
return False
return table["state"] == "REMOVED"


def get_ray_status_output(address):
redis_client = ray._private.services.create_redis_client(address, "")
status = redis_client.hget(DEBUG_AUTOSCALING_STATUS, "value")
error = redis_client.hget(DEBUG_AUTOSCALING_ERROR, "value")
return {
"demand": debug_status(
status, error).split("Demands:")[1].strip("\n").strip(" "),
"usage": debug_status(status, error).split("Demands:")[0].split(
"Usage:")[1].strip("\n").strip(" ")
}


@pytest.mark.parametrize("connect_to_client", [True, False])
def test_placement_ready(ray_start_regular, connect_to_client):
@ray.remote
Expand Down Expand Up @@ -1854,5 +1877,34 @@ def get_gpu(self):
assert len(gpu_ids_res) == 4


def test_placement_group_status_no_bundle_demand(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)

@ray.remote
def f():
pass

pg = ray.util.placement_group([{"CPU": 1}])
ray.get(pg.ready())
ray.util.remove_placement_group(pg)
wait_for_condition(lambda: is_placement_group_removed(pg))
# Create a ready task after the placement group is removed.
# This shouldn't be reported to the resource demand.
r = pg.ready() # noqa

# Wait until the usage is updated, which is
# when the demand is also updated.
def is_usage_updated():
demand_output = get_ray_status_output(cluster.address)
return demand_output["usage"] != ""

wait_for_condition(is_usage_updated)
# The output shouldn't include the pg.ready task demand.
demand_output = get_ray_status_output(cluster.address)
assert demand_output["demand"] == "(no resource demands)"


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

0 comments on commit fdd5210

Please sign in to comment.