Skip to content

Commit

Permalink
Add random actor placement; fix cancellation callback; update test sk…
Browse files Browse the repository at this point in the history
…ips (#11684)
  • Loading branch information
ericl authored Oct 31, 2020
1 parent b10871a commit 48dee78
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 27 deletions.
4 changes: 4 additions & 0 deletions python/ray/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,3 +439,7 @@ def format_web_url(url):
if not url.startswith("http://"):
return "http://" + url
return url


def new_scheduler_enabled():
return os.environ.get("RAY_ENABLE_NEW_SCHEDULER") == "1"
12 changes: 6 additions & 6 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ SRCS = [] + select({
py_test_module_list(
files = [
"test_async.py",
"test_actor.py",
"test_actor_failures.py",
"test_actor_advanced.py",
"test_advanced.py",
"test_advanced_2.py",
"test_basic.py",
"test_basic_2.py",
"test_cli.py",
"test_component_failures_3.py",
"test_error_ray_not_initialized.py",
Expand All @@ -27,15 +33,9 @@ py_test_module_list(

py_test_module_list(
files = [
"test_actor_advanced.py",
"test_actor_failures.py",
"test_actor.py",
"test_actor_resources.py",
"test_advanced_2.py",
"test_advanced_3.py",
"test_advanced.py",
"test_array.py",
"test_basic_2.py",
"test_cancel.py",
"test_component_failures_2.py",
"test_dynres.py",
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import ray.test_utils
import ray.cluster_utils
from ray.test_utils import (run_string_as_driver, get_non_head_nodes,
wait_for_condition)
wait_for_condition, new_scheduler_enabled)
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put


Expand Down Expand Up @@ -91,6 +91,7 @@ def get_location(self):
ray.get(results)


@pytest.mark.skipif(new_scheduler_enabled(), reason="multi node broken")
def test_actor_lifetime_load_balancing(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
Expand Down Expand Up @@ -943,6 +944,7 @@ def set_count(self, count):
}
}],
indirect=True)
@pytest.mark.skipif(new_scheduler_enabled(), reason="todo hangs")
def test_pending_actor_removed_by_owner(ray_start_regular):
# Verify when an owner of pending actors is killed, the actor resources
# are correctly returned.
Expand Down
3 changes: 3 additions & 0 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
wait_for_pid_to_exit,
generate_system_config_map,
get_other_nodes,
new_scheduler_enabled,
SignalActor,
)

Expand Down Expand Up @@ -265,6 +266,7 @@ def ready(self):
assert result == 1 or result == results[-1] + 1


@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic resources todo")
def test_actor_restart_without_task(ray_start_regular):
"""Test a dead actor can be restarted without sending task to it."""

Expand Down Expand Up @@ -483,6 +485,7 @@ def decorated_method(self, x):
"num_cpus": 1,
"num_nodes": 3,
}], indirect=True)
@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic resources todo")
def test_ray_wait_dead_actor(ray_start_cluster):
"""Tests that methods completed by dead actors are returned as ready"""
cluster = ray_start_cluster
Expand Down
2 changes: 2 additions & 0 deletions python/ray/tests/test_advanced_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ray.test_utils import (
RayTestTimeoutException,
wait_for_condition,
new_scheduler_enabled,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -251,6 +252,7 @@ def method(self):
ray.get(x)


@pytest.mark.skipif(new_scheduler_enabled(), reason="zero cpu handling")
def test_zero_cpus_actor(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2970,6 +2970,10 @@ std::string NodeManager::DebugString() const {
uint64_t now_ms = current_time_ms();
result << "NodeManager:";
result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString();
if (cluster_task_manager_ != nullptr) {
result << "\nClusterTaskManager:\n";
result << cluster_task_manager_->DebugString();
}
result << "\nClusterResources:";
for (auto &pair : cluster_resource_map_) {
result << "\n" << pair.first.Hex() << ": " << pair.second.DebugString();
Expand Down
14 changes: 14 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,20 @@ TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() {
return task_resources;
};

bool TaskRequest::IsEmpty() const {
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
if (this->predefined_resources[i].demand != 0) {
return false;
}
}
for (size_t i = 0; i < this->custom_resources.size(); i++) {
if (this->custom_resources[i].demand != 0) {
return false;
}
}
return true;
}

std::string TaskRequest::DebugString() const {
std::stringstream buffer;
buffer << " {";
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class TaskRequest {
/// the task will run on a different node in the cluster, if none of the
/// nodes in this list can schedule this task.
absl::flat_hash_set<int64_t> placement_hints;
/// Check whether the request contains no resources.
bool IsEmpty() const;
/// Returns human-readable string for this task request.
std::string DebugString() const;
};
Expand Down
25 changes: 23 additions & 2 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
}

int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req,
bool actor_creation,
int64_t *total_violations) {
// Minimum number of soft violations across all nodes that can schedule the request.
// We will pick the node with the smallest number of soft violations.
Expand All @@ -167,6 +168,25 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
int64_t best_node = -1;
*total_violations = 0;

if (actor_creation && task_req.IsEmpty()) {
// This an actor which requires no resources.
// Pick a random node to to avoid all scheduling all actors on the local node.
if (nodes_.size() > 0) {
int idx = std::rand() % nodes_.size();
for (auto &node : nodes_) {
if (idx == 0) {
best_node = node.first;
break;
}
idx--;
}
}
RAY_LOG(DEBUG) << "GetBestSchedulableNode, best_node = " << best_node
<< ", # nodes = " << nodes_.size()
<< ", task_req = " << task_req.DebugString();
return best_node;
}

// Check whether local node is schedulable. We return immediately
// the local node only if there are zero violations.
auto it = nodes_.find(local_node_id_);
Expand Down Expand Up @@ -211,10 +231,11 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
}

std::string ClusterResourceScheduler::GetBestSchedulableNode(
const std::unordered_map<std::string, double> &task_resources,
const std::unordered_map<std::string, double> &task_resources, bool actor_creation,
int64_t *total_violations) {
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources);
int64_t node_id = GetBestSchedulableNode(task_request, total_violations);
int64_t node_id =
GetBestSchedulableNode(task_request, actor_creation, total_violations);

std::string id_string;
if (node_id == -1) {
Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,15 @@ class ClusterResourceScheduler {
/// Finally, if no such node exists, return -1.
///
/// \param task_request: Task to be scheduled.
/// \param actor_creation: True if this is an actor creation task.
/// \param violations: The number of soft constraint violations associated
/// with the node returned by this function (assuming
/// a node that can schedule task_req is found).
///
/// \return -1, if no node can schedule the current request; otherwise,
/// return the ID of a node that can schedule the task request.
int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t *violations);
int64_t GetBestSchedulableNode(const TaskRequest &task_request, bool actor_creation,
int64_t *violations);

/// Similar to
/// int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t
Expand All @@ -161,7 +163,8 @@ class ClusterResourceScheduler {
/// return the ID in string format of a node that can schedule the
// task request.
std::string GetBestSchedulableNode(
const std::unordered_map<std::string, double> &task_request, int64_t *violations);
const std::unordered_map<std::string, double> &task_request, bool actor_creation,
int64_t *violations);

/// Decrease the available resources of a node when a task request is
/// scheduled on the given node.
Expand Down
33 changes: 22 additions & 11 deletions src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);

Expand Down Expand Up @@ -428,7 +429,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_EQ(node_id, -1);
}
// Predefined resources, soft constraint violation
Expand All @@ -439,7 +441,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
Expand All @@ -452,7 +455,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations == 0);
}
Expand All @@ -467,7 +471,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id == -1);
}
// Custom resources, soft constraint violation.
Expand All @@ -481,7 +486,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
Expand All @@ -496,7 +502,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations == 0);
}
Expand All @@ -511,7 +518,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id == -1);
}
// Custom resource missing, soft constraint violation.
Expand All @@ -525,7 +533,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
Expand All @@ -541,7 +550,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
placement_hints);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
Expand All @@ -557,7 +567,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
placement_hints);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
int64_t node_id =
cluster_resources.GetBestSchedulableNode(task_req, false, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations == 0);
}
Expand Down
12 changes: 11 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ bool ClusterTaskManager::SchedulePendingTasks() {
// TODO (Alex): We should distinguish between infeasible tasks and a fully
// utilized cluster.
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
request_resources, &_unused);
request_resources, task.GetTaskSpecification().IsActorCreationTask(), &_unused);
if (node_id_string.empty()) {
// There is no node that has available resources to run the request.
// Move on to the next shape.
Expand Down Expand Up @@ -178,12 +178,20 @@ void ClusterTaskManager::HandleTaskFinished(std::shared_ptr<WorkerInterface> wor
worker->ClearAllocatedInstances();
}

void ReplyCancelled(Work &work) {
auto reply = std::get<1>(work);
auto callback = std::get<2>(work);
reply->set_canceled(true);
callback();
}

bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end();
shapes_it++) {
auto &work_queue = shapes_it->second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) {
ReplyCancelled(*work_it);
work_queue.erase(work_it);
if (work_queue.empty()) {
tasks_to_schedule_.erase(shapes_it);
Expand All @@ -197,6 +205,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
auto &work_queue = shapes_it->second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) {
ReplyCancelled(*work_it);
work_queue.erase(work_it);
if (work_queue.empty()) {
tasks_to_dispatch_.erase(shapes_it);
Expand All @@ -208,6 +217,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {

auto iter = waiting_tasks_.find(task_id);
if (iter != waiting_tasks_.end()) {
ReplyCancelled(iter->second);
waiting_tasks_.erase(iter);
return true;
}
Expand Down
Loading

0 comments on commit 48dee78

Please sign in to comment.