Skip to content

Commit

Permalink
Revert "Updating zero capacity resource semantics (ray-project#4555)"
Browse files Browse the repository at this point in the history
This reverts commit 0f42f87.
  • Loading branch information
devin-petersohn committed Apr 18, 2019
1 parent 8f37d49 commit 618147f
Show file tree
Hide file tree
Showing 17 changed files with 191 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,13 @@
* The options class for RayCall or ActorCreation.
*/
public abstract class BaseTaskOptions {
public final Map<String, Double> resources;
public Map<String, Double> resources;

public BaseTaskOptions() {
resources = new HashMap<>();
}

public BaseTaskOptions(Map<String, Double> resources) {
for (Map.Entry<String, Double> entry : resources.entrySet()) {
if (entry.getValue().compareTo(0.0) <= 0) {
throw new IllegalArgumentException(String.format("Resource capacity should be " +
"positive, but got resource %s = %f.", entry.getKey(), entry.getValue()));
}
}
this.resources = resources;
}

Expand Down
2 changes: 1 addition & 1 deletion java/example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ray {
run-mode = CLUSTER

// Available resources on this node.
resources: "CPU:4"
resources: "CPU:4,GPU:0"

// The address of the redis server to connect, in format `ip:port`.
// If not provided, Ray processes will be started locally, including
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.TaskLanguage;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.ResourceUtil;
import org.ray.runtime.util.UniqueIdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -356,6 +357,11 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
resources = new HashMap<>(taskOptions.resources);
}

if (!resources.containsKey(ResourceUtil.CPU_LITERAL)
&& !resources.containsKey(ResourceUtil.CPU_LITERAL.toLowerCase())) {
resources.put(ResourceUtil.CPU_LITERAL, 0.0);
}

int maxActorReconstruction = 0;
if (taskOptions instanceof ActorCreationOptions) {
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public RayConfig(Config config) {
+ "setting it to the number of CPU cores: {}", numCpu);
resources.put("CPU", numCpu * 1.0);
}
if (!resources.containsKey("GPU")) {
LOGGER.warn("No GPU resource is set in configuration, setting it to 0");
resources.put("GPU", 0.0);
}
}
// Driver id.
String driverId = config.getString("ray.driver.id");
Expand Down
2 changes: 1 addition & 1 deletion java/streaming/src/main/resources/ray.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ray {
run-mode = SINGLE_PROCESS
resources = "CPU:4"
resources = "CPU:4,GPU:0"
redis.address = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,30 @@ public Integer echo(Integer number) {
@Test
public void testMethods() {
TestUtils.skipTestUnderSingleProcess();
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0));
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0));

// This is a case that can satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
RayObject<Integer> result1 = Ray.call(ResourcesManagementTest::echo, 100, callOptions1);
Assert.assertEquals(100, (int) result1.get());

CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0));
CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 2.0));

// This is a case that can't satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
final RayObject<Integer> result2 = Ray.call(ResourcesManagementTest::echo, 200, callOptions2);
WaitResult<Integer> waitResult = Ray.wait(ImmutableList.of(result2), 1, 1000);

Assert.assertEquals(1, waitResult.getReady().size());
Assert.assertEquals(0, waitResult.getUnready().size());

try {
CallOptions callOptions3 = new CallOptions(ImmutableMap.of("CPU", 0.0));
Assert.fail();
} catch (RuntimeException e) {
// We should receive a RuntimeException indicates that we should not
// pass a zero capacity resource.
}
Assert.assertEquals(0, waitResult.getReady().size());
Assert.assertEquals(1, waitResult.getUnready().size());
}

@Test
public void testActors() {
TestUtils.skipTestUnderSingleProcess();

ActorCreationOptions actorCreationOptions1 =
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0));
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0));

// This is a case that can satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
Expand All @@ -88,7 +80,7 @@ public void testActors() {
// This is a case that can't satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
ActorCreationOptions actorCreationOptions2 =
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0));
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0, "GPU", 0.0));

RayActor<ResourcesManagementTest.Echo> echo2 =
Ray.createActor(Echo::new, actorCreationOptions2);
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/task.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ cdef class Task:
# Parse the resource map.
if resource_map is not None:
required_resources = resource_map_from_dict(resource_map)
if required_resources.count(b"CPU") == 0:
required_resources[b"CPU"] = 1.0
if placement_resource_map is not None:
required_placement_resources = (
resource_map_from_dict(placement_resource_map))
Expand Down
18 changes: 3 additions & 15 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,25 +1029,14 @@ def check_and_update_resources(num_cpus, num_gpus, resources):
if gpu_ids is not None:
resources["GPU"] = min(resources["GPU"], len(gpu_ids))

resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity != 0
}

# Check types.
for _, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError(
"Resource quantities must all be whole numbers. Received {}.".
format(resources))
if resource_quantity < 0:
raise ValueError(
"Resource quantities must be nonnegative. Received {}.".format(
resources))
raise ValueError("Resource quantities must all be whole numbers.")

if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
raise ValueError("Resource quantities must be at most {}.".format(
ray_constants.MAX_RESOURCE_QUANTITY))
Expand Down Expand Up @@ -1124,9 +1113,8 @@ def start_raylet(redis_address,

# Limit the number of workers that can be started in parallel by the
# raylet. However, make sure it is at least 1.
num_cpus_static = static_resources.get("CPU", 0)
maximum_startup_concurrency = max(
1, min(multiprocessing.cpu_count(), num_cpus_static))
1, min(multiprocessing.cpu_count(), static_resources["CPU"]))

# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join(
Expand Down
32 changes: 3 additions & 29 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1948,15 +1948,15 @@ def run_lots_of_tasks():
store_names = []
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 0
if client["Resources"]["GPU"] == 0
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 5
if client["Resources"]["GPU"] == 5
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 1
if client["Resources"]["GPU"] == 1
]
assert len(store_names) == 3

Expand Down Expand Up @@ -2126,32 +2126,6 @@ def f():
ray.get(results)


def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})

def test():
resources = ray.global_state.available_resources()
retry_count = 0

while resources and retry_count < 5:
time.sleep(0.1)
resources = ray.global_state.available_resources()
retry_count += 1

if retry_count >= 5:
raise RuntimeError("Resources were available even after retries.")

return resources

function = ray.remote(
num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test)
cluster_resources = ray.get(function.remote())

# All cluster resources should be utilized and
# cluster_resources must be empty
assert cluster_resources == {}


@pytest.fixture
def save_gpu_ids_shutdown_only():
# Record the curent value of this environment variable so that we can
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_global_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def cpu_task():

while not resource_used:
available_resources = ray.global_state.available_resources()
resource_used = available_resources.get(
"CPU", 0) == cluster_resources.get("CPU", 0) - 1
resource_used = available_resources[
"CPU"] == cluster_resources["CPU"] - 1

assert resource_used

Expand Down
21 changes: 8 additions & 13 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import traceback

import ray
from ray.tune.error import AbortTrialExecution
from ray.tune.error import TuneError, AbortTrialExecution
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Resources, Checkpoint
from ray.tune.trial_executor import TrialExecutor
Expand Down Expand Up @@ -363,22 +363,17 @@ def _update_avail_resources(self, num_retries=5):
resources = ray.services.check_and_update_resources(
None, None, None)
if not resources:
logger.warning(
"Cluster resources not detected or are 0. Retrying...")
logger.warning("Cluster resources not detected. Retrying...")
time.sleep(0.5)

if not resources:
# NOTE: This hides the possibility that Ray may be waiting for
# clients to connect.
resources.setdefault("CPU", 0)
resources.setdefault("GPU", 0)
logger.warning("Cluster resources cannot be detected or are 0. "
"You can resume this experiment by passing in "
"`resume=True` to `run`.")
if not resources or "CPU" not in resources:
raise TuneError("Cluster resources cannot be detected. "
"You can resume this experiment by passing in "
"`resume=True` to `run`.")

resources = resources.copy()
num_cpus = resources.pop("CPU", 0)
num_gpus = resources.pop("GPU", 0)
num_cpus = resources.pop("CPU")
num_gpus = resources.pop("GPU")
custom_resources = resources

self._avail_resources = Resources(
Expand Down
9 changes: 1 addition & 8 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,6 @@ def submit_task(self,
raise ValueError(
"Resource quantities must all be whole numbers.")

# Remove any resources with zero quantity requirements
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity > 0
}

if placement_resources is None:
placement_resources = {}

Expand Down Expand Up @@ -1877,7 +1870,7 @@ def connect(node,
nil_actor_counter, # actor_counter.
[], # new_actor_handles.
[], # execution_dependencies.
{}, # resource_map.
{"CPU": 0}, # resource_map.
{}, # placement_resource_map.
)

Expand Down
33 changes: 14 additions & 19 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1469,16 +1469,15 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
std::unordered_map<std::string, double> cpu_resources;
if (required_cpus > 0) {
cpu_resources[kCPU_ResourceLabel] = required_cpus;
}
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};

// Release the CPU resources.
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources));
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources)));
worker->MarkBlocked();

// Try dispatching tasks since we may have released some resources.
Expand Down Expand Up @@ -1522,11 +1521,9 @@ void NodeManager::HandleTaskUnblocked(
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
std::unordered_map<std::string, double> cpu_resources_map;
if (required_cpus > 0) {
cpu_resources_map[kCPU_ResourceLabel] = required_cpus;
}
const ResourceSet cpu_resources(cpu_resources_map);
const ResourceSet cpu_resources(
std::unordered_map<std::string, double>({{kCPU_ResourceLabel, required_cpus}}));

// Check if we can reacquire the CPU resources.
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);

Expand All @@ -1536,8 +1533,9 @@ void NodeManager::HandleTaskUnblocked(
// reacquire here may be different from the ones that the task started with.
auto const resource_ids = local_available_resources_.Acquire(cpu_resources);
worker->AcquireTaskCpuResources(resource_ids);
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
cpu_resources);
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
cpu_resources));
} else {
// In this case, we simply don't reacquire the CPU resources for the worker.
// The worker can keep running and when the task finishes, it will simply
Expand Down Expand Up @@ -1629,7 +1627,7 @@ bool NodeManager::AssignTask(const Task &task) {
auto acquired_resources =
local_available_resources_.Acquire(spec.GetRequiredResources());
const auto &my_client_id = gcs_client_->client_table().GetLocalClientId();
cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources());
RAY_CHECK(cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));

if (spec.IsActorCreationTask()) {
// Check that we are not placing an actor creation task on a node with 0 CPUs.
Expand Down Expand Up @@ -1743,8 +1741,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker.GetTaskResourceIds();
local_available_resources_.Release(task_resources);
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
task_resources.ToResourceSet());
RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
task_resources.ToResourceSet()));
worker.ResetTaskResourceIds();

// If this was an actor or actor creation task, handle the actor's new state.
Expand Down Expand Up @@ -2036,9 +2034,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,

RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;

// TODO(romilb): We should probably revert the load subtraction from
// SchedulingPolicy::Schedule()
// Mark the failed task as pending to let other raylets know that we still
// have the task. TaskDependencyManager::TaskPending() is assumed to be
// idempotent.
Expand Down
8 changes: 3 additions & 5 deletions src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,11 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
const auto &node_resources = client_resource_pair.second;
ResourceSet available_node_resources =
ResourceSet(node_resources.GetAvailableResources());
// TODO(romilb): Why do we need to subtract load from available resources?
// Even if we don't the code path below for choosing a dst_client_id would be
// similar.
available_node_resources.SubtractResources(node_resources.GetLoadResources());
available_node_resources.SubtractResourcesStrict(node_resources.GetLoadResources());
RAY_LOG(DEBUG) << "client_id " << node_client_id
<< " avail: " << node_resources.GetAvailableResources().ToString()
<< " load: " << node_resources.GetLoadResources().ToString();
<< " load: " << node_resources.GetLoadResources().ToString()
<< " avail-load: " << available_node_resources.ToString();

if (resource_demand.IsSubset(available_node_resources)) {
// This node is a feasible candidate.
Expand Down
Loading

0 comments on commit 618147f

Please sign in to comment.