Skip to content

Commit faa31ae

Browse files
robertnishiharaatumanov
authored andcommitted
Introduce concept of resources required for placing a task. (#2837)
* Introduce concept of resources required for placement. * Add placement resources to task spec * Update java worker * Update taskinfo.java
1 parent 01bb073 commit faa31ae

File tree

14 files changed

+285
-122
lines changed

14 files changed

+285
-122
lines changed

java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,4 +286,3 @@ public FunctionManager getFunctionManager() {
286286
return functionManager;
287287
}
288288
}
289-

java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ public final class TaskInfo extends Table {
4848
public ResourcePair requiredResources(int j) { return requiredResources(new ResourcePair(), j); }
4949
public ResourcePair requiredResources(ResourcePair obj, int j) { int o = __offset(30); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
5050
public int requiredResourcesLength() { int o = __offset(30); return o != 0 ? __vector_len(o) : 0; }
51-
public int language() { int o = __offset(32); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
52-
public String functionDescriptor(int j) { int o = __offset(34); return o != 0 ? __string(__vector(o) + j * 4) : null; }
53-
public int functionDescriptorLength() { int o = __offset(34); return o != 0 ? __vector_len(o) : 0; }
51+
public ResourcePair requiredPlacementResources(int j) { return requiredPlacementResources(new ResourcePair(), j); }
52+
public ResourcePair requiredPlacementResources(ResourcePair obj, int j) { int o = __offset(32); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
53+
public int requiredPlacementResourcesLength() { int o = __offset(32); return o != 0 ? __vector_len(o) : 0; }
54+
public int language() { int o = __offset(34); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
55+
public String functionDescriptor(int j) { int o = __offset(36); return o != 0 ? __string(__vector(o) + j * 4) : null; }
56+
public int functionDescriptorLength() { int o = __offset(36); return o != 0 ? __vector_len(o) : 0; }
5457

5558
public static int createTaskInfo(FlatBufferBuilder builder,
5659
int driver_idOffset,
@@ -67,11 +70,13 @@ public static int createTaskInfo(FlatBufferBuilder builder,
6770
int argsOffset,
6871
int returnsOffset,
6972
int required_resourcesOffset,
73+
int required_placement_resourcesOffset,
7074
int language,
7175
int function_descriptorOffset) {
72-
builder.startObject(16);
76+
builder.startObject(17);
7377
TaskInfo.addFunctionDescriptor(builder, function_descriptorOffset);
7478
TaskInfo.addLanguage(builder, language);
79+
TaskInfo.addRequiredPlacementResources(builder, required_placement_resourcesOffset);
7580
TaskInfo.addRequiredResources(builder, required_resourcesOffset);
7681
TaskInfo.addReturns(builder, returnsOffset);
7782
TaskInfo.addArgs(builder, argsOffset);
@@ -89,7 +94,7 @@ public static int createTaskInfo(FlatBufferBuilder builder,
8994
return TaskInfo.endTaskInfo(builder);
9095
}
9196

92-
public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(16); }
97+
public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(17); }
9398
public static void addDriverId(FlatBufferBuilder builder, int driverIdOffset) { builder.addOffset(0, driverIdOffset, 0); }
9499
public static void addTaskId(FlatBufferBuilder builder, int taskIdOffset) { builder.addOffset(1, taskIdOffset, 0); }
95100
public static void addParentTaskId(FlatBufferBuilder builder, int parentTaskIdOffset) { builder.addOffset(2, parentTaskIdOffset, 0); }
@@ -110,8 +115,11 @@ public static int createTaskInfo(FlatBufferBuilder builder,
110115
public static void addRequiredResources(FlatBufferBuilder builder, int requiredResourcesOffset) { builder.addOffset(13, requiredResourcesOffset, 0); }
111116
public static int createRequiredResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
112117
public static void startRequiredResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
113-
public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(14, language, 0); }
114-
public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(15, functionDescriptorOffset, 0); }
118+
public static void addRequiredPlacementResources(FlatBufferBuilder builder, int requiredPlacementResourcesOffset) { builder.addOffset(14, requiredPlacementResourcesOffset, 0); }
119+
public static int createRequiredPlacementResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
120+
public static void startRequiredPlacementResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
121+
public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(15, language, 0); }
122+
public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(16, functionDescriptorOffset, 0); }
115123
public static int createFunctionDescriptorVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
116124
public static void startFunctionDescriptorVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
117125
public static int endTaskInfo(FlatBufferBuilder builder) {
@@ -136,4 +144,3 @@ public ByteBuffer returnsAsByteBuffer(int j) {
136144
return src;
137145
}
138146
}
139-

java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
209209
ResourcePair.createResourcePair(fbb, keyOffset, entry.getValue());
210210
}
211211
int requiredResourcesOffset = fbb.createVectorOfTables(requiredResourcesOffsets);
212+
213+
int[] requiredPlacementResourcesOffsets = new int[0];
214+
int requiredPlacementResourcesOffset =
215+
fbb.createVectorOfTables(requiredPlacementResourcesOffsets);
216+
212217
int[] functionDescriptorOffsets = new int[]{
213218
fbb.createString(task.functionDescriptor.className),
214219
fbb.createString(task.functionDescriptor.name),
@@ -222,7 +227,8 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
222227
actorCreateIdOffset, actorCreateDummyIdOffset,
223228
actorIdOffset, actorHandleIdOffset, actorCounter,
224229
false, functionIdOffset,
225-
argsOffset, returnsOffset, requiredResourcesOffset, TaskLanguage.JAVA,
230+
argsOffset, returnsOffset, requiredResourcesOffset,
231+
requiredPlacementResourcesOffset, TaskLanguage.JAVA,
226232
functionDescriptorOffset);
227233
fbb.finish(root);
228234
ByteBuffer buffer = fbb.dataBuffer();

python/ray/actor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,14 +373,24 @@ def _submit(self,
373373
self._num_cpus, self._num_gpus, self._resources, num_cpus,
374374
num_gpus, resources)
375375

376+
# If the actor methods require CPU resources, then set the required
377+
# placement resources. If actor_placement_resources is empty, then
378+
# the required placement resources will be the same as resources.
379+
actor_placement_resources = {}
380+
assert self._actor_method_cpus in [0, 1]
381+
if self._actor_method_cpus == 1:
382+
actor_placement_resources = resources.copy()
383+
actor_placement_resources["CPU"] += 1
384+
376385
creation_args = [self._class_id]
377386
function_id = compute_actor_creation_function_id(self._class_id)
378387
[actor_cursor] = worker.submit_task(
379388
function_id,
380389
creation_args,
381390
actor_creation_id=actor_id,
382391
num_return_vals=1,
383-
resources=resources)
392+
resources=resources,
393+
placement_resources=actor_placement_resources)
384394

385395
# We initialize the actor counter at 1 to account for the actor
386396
# creation task.
@@ -566,6 +576,7 @@ def _actor_method_call(self,
566576
# We add one for the dummy return ID.
567577
num_return_vals=num_return_vals + 1,
568578
resources={"CPU": self._ray_actor_method_cpus},
579+
placement_resources={},
569580
driver_id=self._ray_actor_driver_id)
570581
# Update the actor counter and cursor to reflect the most recent
571582
# invocation.

python/ray/worker.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ def submit_task(self,
550550
execution_dependencies=None,
551551
num_return_vals=None,
552552
resources=None,
553+
placement_resources=None,
553554
driver_id=None):
554555
"""Submit a remote task to the scheduler.
555556
@@ -575,6 +576,9 @@ def submit_task(self,
575576
num_return_vals: The number of return values this function should
576577
have.
577578
resources: The resource requirements for this task.
579+
placement_resources: The resources required for placing the task.
580+
If this is not provided or if it is an empty dictionary, then
581+
the placement resources will be equal to resources.
578582
driver_id: The ID of the relevant driver. This is almost always the
579583
driver ID of the driver that is currently running. However, in
580584
the exceptional case that an actor task is being dispatched to
@@ -628,6 +632,9 @@ def submit_task(self,
628632
raise ValueError(
629633
"Resource quantities must all be whole numbers.")
630634

635+
if placement_resources is None:
636+
placement_resources = {}
637+
631638
with self.state_lock:
632639
# Increment the worker's task index to track how many tasks
633640
# have been submitted by the current task so far.
@@ -640,7 +647,8 @@ def submit_task(self,
640647
num_return_vals, self.current_task_id, task_index,
641648
actor_creation_id, actor_creation_dummy_object_id, actor_id,
642649
actor_handle_id, actor_counter, is_actor_checkpoint_method,
643-
execution_dependencies, resources, self.use_raylet)
650+
execution_dependencies, resources, placement_resources,
651+
self.use_raylet)
644652
self.local_scheduler_client.submit(task)
645653

646654
return task.returns()
@@ -2138,7 +2146,7 @@ def connect(info,
21382146
worker.current_task_id, worker.task_index,
21392147
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
21402148
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
2141-
nil_actor_counter, False, [], {"CPU": 0}, worker.use_raylet)
2149+
nil_actor_counter, False, [], {"CPU": 0}, {}, worker.use_raylet)
21422150

21432151
# Add the driver task to the task table.
21442152
if not worker.use_raylet:

src/common/format/common.fbs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ table TaskInfo {
6060
// The required_resources vector indicates the quantities of the different
6161
// resources required by this task.
6262
required_resources: [ResourcePair];
63+
// The resources required for placing this task on a node. If this is empty,
64+
// then the placement resources are equal to the required_resources.
65+
required_placement_resources: [ResourcePair];
6366
// The language that this task belongs to
6467
language: TaskLanguage;
6568
// Function descriptor, which is a list of strings that can

src/common/lib/python/common_extension.cc

Lines changed: 79 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -295,49 +295,100 @@ PyTypeObject PyObjectIDType = {
295295
PyType_GenericNew, /* tp_new */
296296
};
297297

298-
/* Define the PyTask class. */
298+
// Define the PyTask class.
299+
300+
int resource_map_from_python_dict(
301+
PyObject *resource_map,
302+
std::unordered_map<std::string, double> &out) {
303+
RAY_CHECK(out.size() == 0);
304+
305+
PyObject *key, *value;
306+
Py_ssize_t position = 0;
307+
if (!PyDict_Check(resource_map)) {
308+
PyErr_SetString(PyExc_TypeError, "resource_map must be a dictionary");
309+
return -1;
310+
}
311+
312+
while (PyDict_Next(resource_map, &position, &key, &value)) {
313+
#if PY_MAJOR_VERSION >= 3
314+
if (!PyUnicode_Check(key)) {
315+
PyErr_SetString(PyExc_TypeError,
316+
"the keys in resource_map must be strings");
317+
return -1;
318+
}
319+
#else
320+
if (!PyBytes_Check(key)) {
321+
PyErr_SetString(PyExc_TypeError,
322+
"the keys in resource_map must be strings");
323+
return -1;
324+
}
325+
#endif
326+
327+
// Check that the resource quantities are numbers.
328+
if (!(PyFloat_Check(value) || PyInt_Check(value) || PyLong_Check(value))) {
329+
PyErr_SetString(PyExc_TypeError,
330+
"the values in resource_map must be floats");
331+
return -1;
332+
}
333+
// Handle the case where the key is a bytes object and the case where it
334+
// is a unicode object.
335+
std::string resource_name;
336+
if (PyUnicode_Check(key)) {
337+
PyObject *ascii_key = PyUnicode_AsASCIIString(key);
338+
resource_name =
339+
std::string(PyBytes_AsString(ascii_key), PyBytes_Size(ascii_key));
340+
Py_DECREF(ascii_key);
341+
} else {
342+
resource_name = std::string(PyBytes_AsString(key), PyBytes_Size(key));
343+
}
344+
out[resource_name] = PyFloat_AsDouble(value);
345+
}
346+
return 0;
347+
}
299348

300349
static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
301-
/* ID of the driver that this task originates from. */
350+
// ID of the driver that this task originates from.
302351
UniqueID driver_id;
303-
/* ID of the actor this task should run on. */
352+
// ID of the actor this task should run on.
304353
UniqueID actor_id = ActorID::nil();
305-
/* ID of the actor handle used to submit this task. */
354+
// ID of the actor handle used to submit this task.
306355
UniqueID actor_handle_id = ActorHandleID::nil();
307-
/* How many tasks have been launched on the actor so far? */
356+
// How many tasks have been launched on the actor so far?
308357
int actor_counter = 0;
309-
/* True if this is an actor checkpoint task and false otherwise. */
358+
// True if this is an actor checkpoint task and false otherwise.
310359
PyObject *is_actor_checkpoint_method_object = nullptr;
311-
/* ID of the function this task executes. */
360+
// ID of the function this task executes.
312361
FunctionID function_id;
313-
/* Arguments of the task (can be PyObjectIDs or Python values). */
362+
// Arguments of the task (can be PyObjectIDs or Python values).
314363
PyObject *arguments;
315-
/* Number of return values of this task. */
364+
// Number of return values of this task.
316365
int num_returns;
317-
/* The ID of the task that called this task. */
366+
// The ID of the task that called this task.
318367
TaskID parent_task_id;
319-
/* The number of tasks that the parent task has called prior to this one. */
368+
// The number of tasks that the parent task has called prior to this one.
320369
int parent_counter;
321370
// The actor creation ID.
322371
ActorID actor_creation_id = ActorID::nil();
323372
// The dummy object for the actor creation task (if this is an actor method).
324373
ObjectID actor_creation_dummy_object_id = ObjectID::nil();
325-
/* Arguments of the task that are execution-dependent. These must be
326-
* PyObjectIDs). */
374+
// Arguments of the task that are execution-dependent. These must be
375+
// PyObjectIDs).
327376
PyObject *execution_arguments = nullptr;
328-
/* Dictionary of resource requirements for this task. */
377+
// Dictionary of resource requirements for this task.
329378
PyObject *resource_map = nullptr;
379+
// Dictionary of required placement resources for this task.
380+
PyObject *placement_resource_map = nullptr;
330381
// True if we should use the raylet code path and false otherwise.
331382
PyObject *use_raylet_object = nullptr;
332383
if (!PyArg_ParseTuple(
333-
args, "O&O&OiO&i|O&O&O&O&iOOOO", &PyObjectToUniqueID, &driver_id,
384+
args, "O&O&OiO&i|O&O&O&O&iOOOOO", &PyObjectToUniqueID, &driver_id,
334385
&PyObjectToUniqueID, &function_id, &arguments, &num_returns,
335386
&PyObjectToUniqueID, &parent_task_id, &parent_counter,
336387
&PyObjectToUniqueID, &actor_creation_id, &PyObjectToUniqueID,
337388
&actor_creation_dummy_object_id, &PyObjectToUniqueID, &actor_id,
338389
&PyObjectToUniqueID, &actor_handle_id, &actor_counter,
339390
&is_actor_checkpoint_method_object, &execution_arguments,
340-
&resource_map, &use_raylet_object)) {
391+
&resource_map, &placement_resource_map, &use_raylet_object)) {
341392
return -1;
342393
}
343394

@@ -349,48 +400,25 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
349400

350401
// Parse the resource map.
351402
std::unordered_map<std::string, double> required_resources;
403+
std::unordered_map<std::string, double> required_placement_resources;
352404

353-
bool found_CPU_requirements = false;
354-
PyObject *key, *value;
355-
Py_ssize_t position = 0;
356405
if (resource_map != nullptr) {
357-
if (!PyDict_Check(resource_map)) {
358-
PyErr_SetString(PyExc_TypeError, "resource_map must be a dictionary");
406+
if (resource_map_from_python_dict(resource_map, required_resources) != 0) {
359407
return -1;
360408
}
361-
while (PyDict_Next(resource_map, &position, &key, &value)) {
362-
if (!(PyBytes_Check(key) || PyUnicode_Check(key))) {
363-
PyErr_SetString(PyExc_TypeError,
364-
"the keys in resource_map must be strings");
365-
return -1;
366-
}
367-
if (!(PyFloat_Check(value) || PyInt_Check(value) ||
368-
PyLong_Check(value))) {
369-
PyErr_SetString(PyExc_TypeError,
370-
"the values in resource_map must be floats");
371-
return -1;
372-
}
373-
// Handle the case where the key is a bytes object and the case where it
374-
// is a unicode object.
375-
std::string resource_name;
376-
if (PyUnicode_Check(key)) {
377-
PyObject *ascii_key = PyUnicode_AsASCIIString(key);
378-
resource_name =
379-
std::string(PyBytes_AsString(ascii_key), PyBytes_Size(ascii_key));
380-
Py_DECREF(ascii_key);
381-
} else {
382-
resource_name = std::string(PyBytes_AsString(key), PyBytes_Size(key));
383-
}
384-
if (resource_name == std::string("CPU")) {
385-
found_CPU_requirements = true;
386-
}
387-
required_resources[resource_name] = PyFloat_AsDouble(value);
388-
}
389409
}
390-
if (!found_CPU_requirements) {
410+
411+
if (required_resources.count("CPU") == 0) {
391412
required_resources["CPU"] = 1.0;
392413
}
393414

415+
if (placement_resource_map != nullptr) {
416+
if (resource_map_from_python_dict(placement_resource_map,
417+
required_placement_resources) != 0) {
418+
return -1;
419+
}
420+
}
421+
394422
Py_ssize_t num_args = PyList_Size(arguments);
395423

396424
bool use_raylet = false;
@@ -463,7 +491,7 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
463491
driver_id, parent_task_id, parent_counter, actor_creation_id,
464492
actor_creation_dummy_object_id, actor_id, actor_handle_id,
465493
actor_counter, function_id, args, num_returns, required_resources,
466-
Language::PYTHON);
494+
required_placement_resources, Language::PYTHON);
467495
}
468496

469497
/* Set the task's execution dependencies. */

0 commit comments

Comments
 (0)