Skip to content

Commit

Permalink
[Core] Fix PlacementGroup wait hangs with infeasible placement group (r…
Browse files Browse the repository at this point in the history
…ay-project#28899)

Timeout wasn't properly applied when having a sync RPC.
  • Loading branch information
rkooo567 authored Oct 5, 2022
1 parent 33e5d1e commit 95a9c0a
Show file tree
Hide file tree
Showing 16 changed files with 35 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cpp/include/ray/api/ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class RayRuntime {
const ray::PlacementGroupCreationOptions &create_options) = 0;
virtual void RemovePlacementGroup(const std::string &group_id) = 0;
virtual bool WaitPlacementGroupReady(const std::string &group_id,
int timeout_seconds) = 0;
int64_t timeout_seconds) = 0;
virtual bool WasCurrentActorRestarted() = 0;
virtual std::vector<PlacementGroup> GetAllPlacementGroups() = 0;
virtual PlacementGroup GetPlacementGroupById(const std::string &id) = 0;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void AbstractRayRuntime::RemovePlacementGroup(const std::string &group_id) {
}

bool AbstractRayRuntime::WaitPlacementGroupReady(const std::string &group_id,
int timeout_seconds) {
int64_t timeout_seconds) {
return task_submitter_->WaitPlacementGroupReady(group_id, timeout_seconds);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class AbstractRayRuntime : public RayRuntime {
ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
void RemovePlacementGroup(const std::string &group_id);
bool WaitPlacementGroupReady(const std::string &group_id, int timeout_seconds);
bool WaitPlacementGroupReady(const std::string &group_id, int64_t timeout_seconds);

const TaskID &GetCurrentTaskId();

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ray::PlacementGroup LocalModeTaskSubmitter::CreatePlacementGroup(
ray::PlacementGroup placement_group{
PlacementGroupID::Of(local_mode_ray_tuntime_.GetCurrentJobID()).Binary(),
create_options};
placement_group.SetWaitCallbak([this](const std::string &id, int timeout_seconds) {
placement_group.SetWaitCallbak([this](const std::string &id, int64_t timeout_seconds) {
return WaitPlacementGroupReady(id, timeout_seconds);
});
placement_groups_.emplace(placement_group.GetID(), placement_group);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ ray::PlacementGroup NativeTaskSubmitter::CreatePlacementGroup(
}

ray::PlacementGroup placement_group{placement_group_id.Binary(), create_options};
placement_group.SetWaitCallbak([this](const std::string &id, int timeout_seconds) {
placement_group.SetWaitCallbak([this](const std::string &id, int64_t timeout_seconds) {
return WaitPlacementGroupReady(id, timeout_seconds);
});

Expand All @@ -197,7 +197,7 @@ void NativeTaskSubmitter::RemovePlacementGroup(const std::string &group_id) {
}

bool NativeTaskSubmitter::WaitPlacementGroupReady(const std::string &group_id,
int timeout_seconds) {
int64_t timeout_seconds) {
auto placement_group_id = ray::PlacementGroupID::FromBinary(group_id);
auto status = CoreWorkerProcess::GetCoreWorker().WaitPlacementGroupReady(
placement_group_id, timeout_seconds);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/native_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class NativeTaskSubmitter : public TaskSubmitter {
ray::PlacementGroup CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options);
void RemovePlacementGroup(const std::string &group_id);
bool WaitPlacementGroupReady(const std::string &group_id, int timeout_seconds);
bool WaitPlacementGroupReady(const std::string &group_id, int64_t timeout_seconds);

private:
ObjectID Submit(InvocationSpec &invocation, const CallOptions &call_options);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class TaskSubmitter {

virtual void RemovePlacementGroup(const std::string &group_id) = 0;

virtual bool WaitPlacementGroupReady(const std::string &group_id, int timeout_seconds) {
virtual bool WaitPlacementGroupReady(const std::string &group_id,
int64_t timeout_seconds) {
return true;
}
};
Expand Down
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1886,11 +1886,11 @@ cdef class CoreWorker:

def wait_placement_group_ready(self,
PlacementGroupID placement_group_id,
int32_t timeout_seconds):
int64_t timeout_seconds):
cdef CRayStatus status
cdef CPlacementGroupID cplacement_group_id = (
CPlacementGroupID.FromBinary(placement_group_id.binary()))
cdef int ctimeout_seconds = timeout_seconds
cdef int64_t ctimeout_seconds = timeout_seconds
with nogil:
status = CCoreWorkerProcess.GetCoreWorker() \
.WaitPlacementGroupReady(cplacement_group_id, ctimeout_seconds)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus RemovePlacementGroup(
const CPlacementGroupID &placement_group_id)
CRayStatus WaitPlacementGroupReady(
const CPlacementGroupID &placement_group_id, int timeout_seconds)
const CPlacementGroupID &placement_group_id, int64_t timeout_seconds)
optional[c_vector[CObjectReference]] SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
Expand Down
12 changes: 12 additions & 0 deletions python/ray/tests/test_placement_group_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ def test_placement_group_wait_api(ray_start_cluster_head_with_external_redis):
placement_group1.wait(10)


def test_placement_group_wait_api_timeout(shutdown_only):
"""Make sure the wait API timeout works
https://github.com/ray-project/ray/issues/27287
"""
ray.init(num_cpus=1)
pg = ray.util.placement_group(bundles=[{"CPU": 2}])
start = time.time()
assert not pg.wait(5)
assert 5 <= time.time() - start


@pytest.mark.parametrize("connect_to_client", [False, True])
def test_schedule_placement_groups_at_the_same_time(connect_to_client):
ray.init(num_cpus=4)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def ready(self) -> "ray._raylet.ObjectRef":
resources={BUNDLE_RESOURCE_LABEL: 0.001},
).remote(self)

def wait(self, timeout_seconds: Union[float, int]) -> bool:
def wait(self, timeout_seconds: Union[float, int] = 30) -> bool:
"""Wait for the placement group to be ready within the specified time.
Args:
timeout_seconds(float|int): Timeout in seconds.
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class MockPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor {
(override));
MOCK_METHOD(Status,
SyncWaitUntilReady,
(const PlacementGroupID &placement_group_id),
(const PlacementGroupID &placement_group_id, int64_t timeout_seconds),
(override));
};

Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1897,9 +1897,9 @@ Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_
}

Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_group_id,
int timeout_seconds) {
const auto status =
gcs_client_->PlacementGroups().SyncWaitUntilReady(placement_group_id);
int64_t timeout_seconds) {
const auto status = gcs_client_->PlacementGroups().SyncWaitUntilReady(
placement_group_id, timeout_seconds);
if (status.IsTimedOut()) {
std::ostringstream stream;
stream << "There was timeout in waiting for placement group " << placement_group_id
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \return Status OK if the placement group is created. TimedOut if request to GCS
/// server times out. NotFound if placement group is already removed or doesn't exist.
Status WaitPlacementGroupReady(const PlacementGroupID &placement_group_id,
int timeout_seconds);
int64_t timeout_seconds);

/// Submit an actor task.
///
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1012,12 +1012,12 @@ Status PlacementGroupInfoAccessor::AsyncGetAll(
}

Status PlacementGroupInfoAccessor::SyncWaitUntilReady(
const PlacementGroupID &placement_group_id) {
const PlacementGroupID &placement_group_id, int64_t timeout_seconds) {
rpc::WaitPlacementGroupUntilReadyRequest request;
rpc::WaitPlacementGroupUntilReadyReply reply;
request.set_placement_group_id(placement_group_id.Binary());
auto status = client_impl_->GetGcsRpcClient().SyncWaitPlacementGroupUntilReady(
request, &reply, GetGcsTimeoutMs());
request, &reply, absl::ToInt64Milliseconds(absl::Seconds(timeout_seconds)));
RAY_LOG(DEBUG) << "Finished waiting placement group until ready, placement group id = "
<< placement_group_id;
return status;
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -677,9 +677,11 @@ class PlacementGroupInfoAccessor {
/// The RPC will timeout after the default GCS RPC timeout is exceeded.
///
/// \param placement_group_id The id for the placement group to wait for until ready.
/// \param timeout_seconds The timeout in seconds.
/// \return Status. TimedOut if the RPC times out. NotFound if the placement has already
/// removed.
virtual Status SyncWaitUntilReady(const PlacementGroupID &placement_group_id);
virtual Status SyncWaitUntilReady(const PlacementGroupID &placement_group_id,
int64_t timeout_seconds);

private:
GcsClient *client_impl_;
Expand Down

0 comments on commit 95a9c0a

Please sign in to comment.