Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Placement Group]Add detached support for placement group. #13582

Merged
merged 18 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions doc/source/placement-group.rst
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,39 @@ Note that you can anytime remove the placement group to clean up resources.

ray.shutdown()

Placement Group Lifetimes
-------------------------

.. tabs::
.. group-tab:: Python

By default, the lifetimes of placement group is Non-Detached which will be destroy
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
when the job or its creator(even a detached actor) is done. If you want to keep it
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
alive, you should make its lifetime `detached`, just like the following code:
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

# first_driver.py
pg = placement_group([{"CPU": 2}, {"CPU": 2}], strategy="STRICT_SPREAD", lifetime="detached")
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
ray.get(pg.ready())

The pg will be kept alive even after the driver running above script exits. Therefore
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
it is possible to get the placement group in a different driver:
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

# second_driver.py
table = ray.util.placement_group_table()
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
print(len(table))

Note that the lifetime option is decoupled from the name. If we only specified
the name without specifying ``lifetime="detached"``, then the placement group can
only be retrieved as long as the original driver is still running.

.. group-tab:: Java

Customizing lifetime of an placement group hasn't been implemented in Java yet.
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved

Tips for Using Placement Groups
-------------------------------
- Learn the :ref:`lifecycle <ray-placement-group-lifecycle-ref>` of placement groups.
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,8 @@ cdef class CoreWorker:
self,
c_string name,
c_vector[unordered_map[c_string, double]] bundles,
c_string strategy):
c_string strategy,
c_bool is_detached):
cdef:
CPlacementGroupID c_placement_group_id
CPlacementStrategy c_strategy
Expand All @@ -1208,7 +1209,8 @@ cdef class CoreWorker:
CPlacementGroupCreationOptions(
name,
c_strategy,
bundles
bundles,
is_detached
),
&c_placement_group_id))

Expand Down
4 changes: 3 additions & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,9 @@ def _remote(self,
elif lifetime == "detached":
detached = True
else:
raise ValueError("lifetime must be either `None` or 'detached'")
raise ValueError(
"actor `lifetime` argument must be either `None` or 'detached'"
)

if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
Expand Down
3 changes: 2 additions & 1 deletion python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
CPlacementGroupCreationOptions(
const c_string &name,
CPlacementStrategy strategy,
const c_vector[unordered_map[c_string, double]] &bundles
const c_vector[unordered_map[c_string, double]] &bundles,
c_bool is_detached
)

cdef extern from "ray/gcs/gcs_client.h" nogil:
Expand Down
113 changes: 113 additions & 0 deletions python/ray/tests/test_placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,119 @@ def is_all_placement_group_removed():

wait_for_condition(is_all_placement_group_removed)

ray.shutdown()


def test_detached_placement_group(ray_start_cluster):
cluster = ray_start_cluster
for _ in range(2):
cluster.add_node(num_cpus=3)
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
cluster.wait_for_nodes()
info = ray.init(address=cluster.address)

# Make sure detached placement group will alive when job dead.
driver_code = f"""
import ray

ray.init(address="{info["redis_address"]}")

pg = ray.util.placement_group(
[{{"CPU": 1}} for _ in range(2)],
strategy="STRICT_SPREAD", lifetime="detached")
ray.get(pg.ready())

@ray.remote(num_cpus=1)
class Actor:
def ready(self):
return True

for bundle_index in range(2):
actor = Actor.options(lifetime="detached", placement_group=pg,
placement_group_bundle_index=bundle_index).remote()
ray.get(actor.ready.remote())

ray.shutdown()
"""

run_string_as_driver(driver_code)

# Wait until the driver is reported as dead by GCS.
def is_job_done():
jobs = ray.jobs()
for job in jobs:
if "StopTime" in job:
return True
return False

def assert_alive_num_pg(expected_num_pg):
alive_num_pg = 0
for _, placement_group_info in ray.util.placement_group_table().items(
):
if placement_group_info["state"] == "CREATED":
alive_num_pg += 1
return alive_num_pg == expected_num_pg

def assert_alive_num_actor(expected_num_actor):
alive_num_actor = 0
for actor_info in ray.actors().values():
if actor_info["State"] == ray.gcs_utils.ActorTableData.ALIVE:
alive_num_actor += 1
return alive_num_actor == expected_num_actor

wait_for_condition(is_job_done)

assert assert_alive_num_pg(1)
assert assert_alive_num_actor(2)

# Make sure detached placement group will alive when its creator which
# is detached actor dead.
# Test actors first.
@ray.remote(num_cpus=1)
class NestedActor:
def ready(self):
return True

@ray.remote(num_cpus=1)
class Actor:
def __init__(self):
self.actors = []

def ready(self):
return True

def schedule_nested_actor_with_detached_pg(self):
# Create placement group which is detached.
pg = ray.util.placement_group(
[{
"CPU": 1
} for _ in range(2)],
strategy="STRICT_SPREAD",
lifetime="detached",
name="detached_pg")
ray.get(pg.ready())
# Schedule nested actor with the placement group.
for bundle_index in range(2):
actor = NestedActor.options(
placement_group=pg,
placement_group_bundle_index=bundle_index,
lifetime="detached").remote()
ray.get(actor.ready.remote())
self.actors.append(actor)

a = Actor.options(lifetime="detached").remote()
ray.get(a.ready.remote())
# 1 parent actor and 2 children actor.
ray.get(a.schedule_nested_actor_with_detached_pg.remote())

# Kill an actor and wait until it is killed.
ray.kill(a)
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())

# We should have 2 alive pgs and 4 alive actors.
assert assert_alive_num_pg(2)
assert assert_alive_num_actor(4)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
17 changes: 15 additions & 2 deletions python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ def _fill_bundle_cache_if_needed(self):

def placement_group(bundles: List[Dict[str, float]],
strategy: str = "PACK",
name: str = "unnamed_group") -> PlacementGroup:
name: str = "unnamed_group",
lifetime=None) -> PlacementGroup:
"""Asynchronously creates a PlacementGroup.

Args:
Expand All @@ -160,6 +161,10 @@ def placement_group(bundles: List[Dict[str, float]],
- "STRICT_SPREAD": Packs Bundles across distinct nodes.

name(str): The name of the placement group.
lifetime(str): Either `None`, which defaults to the placement group
will fate share with its creator and will be deleted once its
creator is dead, or "detached", which means the placement group
will live as a global object independent of the creator.

Return:
PlacementGroup: Placement group object.
Expand All @@ -179,8 +184,16 @@ def placement_group(bundles: List[Dict[str, float]],
"Bundles cannot be an empty dictionary or "
f"resources with only 0 values. Bundles: {bundles}")

if lifetime is None:
detached = False
elif lifetime == "detached":
detached = True
else:
raise ValueError("placement group `lifetime` argument must be either"
" `None` or 'detached'")

placement_group_id = worker.core_worker.create_placement_group(
name, bundles, strategy)
name, bundles, strategy, detached)

return PlacementGroup(placement_group_id)

Expand Down
6 changes: 4 additions & 2 deletions src/ray/common/placement_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class PlacementGroupSpecBuilder {
PlacementGroupSpecBuilder &SetPlacementGroupSpec(
const PlacementGroupID &placement_group_id, std::string name,
const std::vector<std::unordered_map<std::string, double>> &bundles,
const rpc::PlacementStrategy strategy, const JobID &creator_job_id,
const ActorID &creator_actor_id, bool is_creator_detached_actor) {
const rpc::PlacementStrategy strategy, const bool is_detached,
const JobID &creator_job_id, const ActorID &creator_actor_id,
bool is_creator_detached_actor) {
message_->set_placement_group_id(placement_group_id.Binary());
message_->set_name(name);
message_->set_strategy(strategy);
Expand All @@ -82,6 +83,7 @@ class PlacementGroupSpecBuilder {
message_->set_creator_job_dead(is_creator_detached_actor);
message_->set_creator_actor_id(creator_actor_id.Binary());
message_->set_creator_actor_dead(creator_actor_id.IsNil());
message_->set_is_detached(is_detached);

for (size_t i = 0; i < bundles.size(); i++) {
auto resources = bundles[i];
Expand Down
9 changes: 7 additions & 2 deletions src/ray/core_worker/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,20 @@ using PlacementStrategy = rpc::PlacementStrategy;
struct PlacementGroupCreationOptions {
PlacementGroupCreationOptions(
std::string name, PlacementStrategy strategy,
std::vector<std::unordered_map<std::string, double>> bundles)
: name(std::move(name)), strategy(strategy), bundles(std::move(bundles)) {}
std::vector<std::unordered_map<std::string, double>> bundles, bool is_detached)
: name(std::move(name)),
strategy(strategy),
bundles(std::move(bundles)),
is_detached(is_detached) {}

/// The name of the placement group.
const std::string name;
/// The strategy to place the bundle in Placement Group.
const PlacementStrategy strategy = rpc::PACK;
/// The resource bundles in this placement group.
const std::vector<std::unordered_map<std::string, double>> bundles;
/// Whether to keep the placement group persistent after its creator dead.
const bool is_detached = false;
};

} // namespace ray
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1462,8 +1462,8 @@ Status CoreWorker::CreatePlacementGroup(
builder.SetPlacementGroupSpec(
placement_group_id, placement_group_creation_options.name,
placement_group_creation_options.bundles, placement_group_creation_options.strategy,
worker_context_.GetCurrentJobID(), worker_context_.GetCurrentActorID(),
worker_context_.CurrentActorDetached());
placement_group_creation_options.is_detached, worker_context_.GetCurrentJobID(),
worker_context_.GetCurrentActorID(), worker_context_.CurrentActorDetached());
PlacementGroupSpecification placement_group_spec = builder.Build();
*return_placement_group_id = placement_group_id;
RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
});
});
return ray::PlacementGroupCreationOptions(JavaStringToNativeString(env, name),
ConvertStrategy(java_strategy), bundles);
ConvertStrategy(java_strategy), bundles,
/*is_detached=*/false);
}

#ifdef __cplusplus
Expand Down
12 changes: 8 additions & 4 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,15 @@ void GcsPlacementGroup::MarkCreatorActorDead() {
placement_group_table_data_.set_creator_actor_dead(true);
}

bool GcsPlacementGroup::IsPlacementGroupRemovable() const {
return placement_group_table_data_.creator_job_dead() &&
bool GcsPlacementGroup::IsPlacementGroupLifetimeDone() const {
return !IsDetached() && placement_group_table_data_.creator_job_dead() &&
placement_group_table_data_.creator_actor_dead();
}

bool GcsPlacementGroup::IsDetached() const {
return placement_group_table_data_.is_detached();
}

/////////////////////////////////////////////////////////////////////////////////////////

GcsPlacementGroupManager::GcsPlacementGroupManager(
Expand Down Expand Up @@ -495,7 +499,7 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
continue;
}
placement_group->MarkCreatorJobDead();
if (placement_group->IsPlacementGroupRemovable()) {
if (placement_group->IsPlacementGroupLifetimeDone()) {
RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {});
}
}
Expand All @@ -509,7 +513,7 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
continue;
}
placement_group->MarkCreatorActorDead();
if (placement_group->IsPlacementGroupRemovable()) {
if (placement_group->IsPlacementGroupLifetimeDone()) {
RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {});
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class GcsPlacementGroup {
placement_group_spec.creator_job_dead());
placement_group_table_data_.set_creator_actor_dead(
placement_group_spec.creator_actor_dead());
placement_group_table_data_.set_is_detached(placement_group_spec.is_detached());
}

/// Get the immutable PlacementGroupTableData of this placement group.
Expand Down Expand Up @@ -107,8 +108,11 @@ class GcsPlacementGroup {
/// Mark that the creator actor of this placement group is dead.
void MarkCreatorActorDead();

/// Return True if the placement group is removable. False otherwise.
bool IsPlacementGroupRemovable() const;
/// Return True if the placement group lifetime is done. False otherwise.
bool IsPlacementGroupLifetimeDone() const;

/// Returns whether or not this is a detached placement group.
bool IsDetached() const;

private:
/// The placement_group meta data which contains the task specification as well as the
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ struct Mocker {
PlacementGroupSpecBuilder builder;

auto placement_group_id = PlacementGroupID::FromRandom();
builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy, job_id,
actor_id, /* is_creator_detached */ false);
builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy,
/* is_detached */ false, job_id, actor_id,
/* is_creator_detached */ false);
return builder.Build();
}

Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ message PlacementGroupSpec {
bool creator_job_dead = 7;
// Whether or not if the creator actor is dead.
bool creator_actor_dead = 8;
// Whether the placement group is persistent.
bool is_detached = 9;
}

message ObjectReference {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ message PlacementGroupTableData {
bool creator_job_dead = 8;
// Whether or not if the creator actor is dead.
bool creator_actor_dead = 9;
// Whether the placement group is persistent.
bool is_detached = 10;
}

message ScheduleData {
Expand Down