Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Cancel lease requests before returning a PG bundle #46116

Merged
merged 6 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import asyncio
import os
import threading
from time import sleep
Expand All @@ -22,6 +23,8 @@
)
from ray.job_submission import JobSubmissionClient, JobStatus
from ray._raylet import GcsClient
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray.util.state import list_placement_groups

import psutil

Expand Down Expand Up @@ -1213,6 +1216,87 @@ def spawn(self, name, namespace):
raise ValueError(f"Unknown case: {case}")


MyPlugin = "MyPlugin"
MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin"


class HangPlugin(RuntimeEnvPlugin):
name = MyPlugin

async def create(
self,
uri,
runtime_env,
ctx,
logger, # noqa: F821
) -> float:
while True:
await asyncio.sleep(1)

@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
gcs_rpc_server_reconnect_timeout_s=60,
testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501
),
],
indirect=True,
)
@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_placement_group_removal_after_gcs_restarts(
set_runtime_env_plugins, ray_start_regular_with_external_redis
):
@ray.remote
def task():
pass

pg = ray.util.placement_group(bundles=[{"CPU": 1}])
_ = task.options(
max_retries=0,
num_cpus=1,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
),
runtime_env={
MyPlugin: {"name": "f2"},
"config": {"setup_timeout_seconds": -1},
},
).remote()

# The task should be popping worker
# TODO(jjyao) Use a more determinstic way to
# decide whether the task is popping worker
sleep(5)

ray.util.remove_placement_group(pg)
# The PG is marked as REMOVED in redis but not removed yet from raylet
# due to the injected delay of CancelResourceReserve rpc
wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED")

ray._private.worker._global_node.kill_gcs_server()
# After GCS restarts, it will try to remove the PG resources
# again via ReleaseUnusedBundles rpc
ray._private.worker._global_node.start_gcs_server()

def verify_pg_resources_cleaned():
r_keys = ray.available_resources().keys()
return all("group" not in k for k in r_keys)

wait_for_condition(verify_pg_resources_cleaned, timeout=30)


if __name__ == "__main__":

import pytest
Expand Down
7 changes: 3 additions & 4 deletions python/ray/tests/test_placement_group_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,9 @@ async def create(
) -> float:
await asyncio.sleep(PLUGIN_TIMEOUT)


@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1
@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
Expand Down
36 changes: 36 additions & 0 deletions python/ray/tests/test_placement_group_failover.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest
import sys
import ray
import time
import ray.cluster_utils
from ray._private.test_utils import get_other_nodes, wait_for_condition
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

MB = 1024 * 1024

Expand All @@ -16,6 +18,40 @@ def value(self):
return self.n


def test_placement_group_recover_prepare_failure(monkeypatch, ray_start_cluster):
# Test to make sure that gcs can handle the prepare pg failure
# by retrying on other nodes.
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

monkeypatch.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_server.PrepareBundleResources=500000000:500000000",
)
worker1 = cluster.add_node(num_cpus=1)
pg = ray.util.placement_group(
strategy="STRICT_SPREAD", bundles=[{"CPU": 1}, {"CPU": 1}]
)
# actor will wait for the pg to be created
actor = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)
).remote()

# wait for the prepare rpc to be sent
time.sleep(1)

# prepare will fail
cluster.remove_node(worker1)

monkeypatch.delenv("RAY_testing_asio_delay_us")
# prepare will retry on this node
cluster.add_node(num_cpus=1)

# pg can be created successfully
ray.get(actor.value.remote())


# Test whether the bundles spread on two nodes can be rescheduled successfully
# when both nodes die at the same time.
def test_placement_group_failover_when_two_nodes_die(monkeypatch, ray_start_cluster):
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
thread.join();

RayConfig::instance().initialize(promise.get_future().get());
ray::asio::testing::init();
}

void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {
Expand Down
6 changes: 4 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,8 +1321,10 @@ void GcsActorManager::OnActorSchedulingFailed(
ray::rpc::ActorDeathCause death_cause;
switch (failure_type) {
case rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED:
error_msg =
"Could not create the actor because its associated placement group was removed.";
error_msg = absl::StrCat(
"Could not create the actor because its associated placement group was "
"removed.\n",
scheduling_failure_message);
death_cause.mutable_actor_unschedulable_context()->set_error_message(error_msg);
break;
case rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED:
Expand Down
39 changes: 15 additions & 24 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
auto node_id = NodeID::FromBinary(node.value()->node_id());

if (max_retry == current_retry_cnt) {
RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
return;
}

Expand All @@ -261,11 +261,10 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
} else {
// We couldn't delete the pg resources either becuase it is in use
// or network issue. Retry.
RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
// We couldn't delete the pg resources because of network issue. Retry.
RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
execute_after(
io_context_,
[this, bundle_spec, node, max_retry, current_retry_cnt] {
Expand Down Expand Up @@ -568,14 +567,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
for (const auto &iter : *(leasing_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
}
}
Expand All @@ -594,14 +589,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
for (const auto &iter : *(committed_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
committed_bundle_location_index_.Erase(placement_group_id);
cluster_resource_scheduler_.GetClusterResourceManager()
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int main(int argc, char *argv[]) {
gflags::ShutDownCommandLineFlags();

RayConfig::instance().initialize(config_list);
ray::asio::testing::init();

// IO Service for main loop.
instrumented_io_context main_service;
Expand Down
Loading
Loading