-
Notifications
You must be signed in to change notification settings - Fork 6.2k
General attribute-based heterogeneity support with hard and soft constraints #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
General attribute-based heterogeneity support with hard and soft constraints #248
Conversation
atumanov
commented
Feb 6, 2017
- modifications to the global and local scheduler to provide attribute-based resource accounting
- support for hard capacity constraints
- support for soft dynamic resource capacity constraints, including transfer-cost awareness, in the global scheduler
python/global_scheduler/test/test.py
Outdated
redis_address=redis_address) | ||
# Connect to the scheduler. | ||
self.photon_client = photon.PhotonClient(local_scheduler_name) | ||
self.plasma_store_p2 = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name is a little unusual, want to rename to self.plasma_stores? (similar for plasma_store_p3 and 4)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also put all the processes in one list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, actually, I'd rather keep them separate, because I'd prefer to kill all photons, followed by all plasma managers, followed by all plasma stores.
python/global_scheduler/test/test.py
Outdated
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server") | ||
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so") | ||
assert os.path.isfile(redis_path) | ||
assert os.path.isfile(redis_module) | ||
node_ip_address = "127.0.0.1" | ||
#redis_port = 6379 #default Redis port |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
python/global_scheduler/test/test.py
Outdated
plasma_store_name, p2 = plasma.start_plasma_store() | ||
self.plasma_store_p2.append(p2) | ||
# Start the Plasma manager. | ||
# Assumption: Plasma manager name and port are randomly generated by the plasma module. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can get rid of "Assumption" here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to check to make sure this is indeed the case, because we need different (name, port) pairs generated for the multinode test case. If the generated (name, port) pair were deterministic, I would've had to make changes inside the plasma module. I think explicitly stating this post-condition is useful.
@@ -39,10 +39,12 @@ typedef struct { | |||
UT_hash_handle hh; | |||
} scheduler_object_info; | |||
|
|||
/* plasma_manager ip:port -> photon_db_client_id association entry. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put the documentation into the same form as everywhere else, i.e. full sentences ("Association of foo to bar" is ok), doxygen style (i.e. /** instead of /*) and for members preceding the member and not following it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, maybe we should add doxygen generation to the build, so it explicitly fails the build if the comments are not doxygen compliant. Since this is a header file (for which we expect to produce doxygen documentation), I agree with your comment placement suggestion.
python/global_scheduler/test/test.py
Outdated
db_client_id = self.get_plasma_manager_id() | ||
assert(db_client_id != None) | ||
assert(db_client_id.startswith(b"CL:")) | ||
db_client_id = db_client_id[len(b"CL:"):] # Remove the CL: prefix. | ||
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
white space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave all white space-related cleanup until the very end and run all the code through the linter that formats it for me.
python/global_scheduler/test/test.py
Outdated
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3) | ||
|
||
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 2*NUM_CLUSTER_NODES+1) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
white space
python/global_scheduler/test/test.py
Outdated
num_return_vals = [0, 1, 2, 3, 5, 10] | ||
# There should not be anything else in Redis yet. | ||
self.assertEqual(len(self.redis_client.keys("*")), 3) | ||
self.assertEqual(len(self.redis_client.keys("*")), 2*NUM_CLUSTER_NODES+1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 * NUM_CLUSTER_NODES + 1
python/global_scheduler/test/test.py
Outdated
plasma_manager_name=plasma_manager_name, | ||
plasma_address=plasma_address, | ||
redis_address=redis_address, | ||
static_resource_list=[str(DEFAULT_NUM_CPUS), str(DEFAULT_NUM_GPUS)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to use ints here and turn them into strings inside of photon.start_local_scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
@@ -262,7 +262,8 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): | |||
|
|||
def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, | |||
plasma_manager_name, worker_path, plasma_address=None, | |||
cleanup=True, redirect_output=False): | |||
cleanup=True, redirect_output=False, | |||
static_resource_list=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document this argument
strlen(plasma_photon_entry->aux_address), plasma_photon_entry); | ||
|
||
/* Add photon_db_client_id -> plasma_manager ip:port association to state.*/ | ||
HASH_ADD(photon_plasma_hh, state->photon_plasma_map, | ||
photon_db_client_id, /* Key is the field name of entry struct. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've been always putting comments on their own lines and not on separate lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, but this is a rather inconvenient stylistic choice. When a comment is on the same line as the code it comments on, there's no ambiguity which line of code it corresponds to. When the comment is on its own line, it could correspond to the line before or the line after. It's only by convention that you know it's the latter.
It makes sense for header files, when we expect doxygen to generate API documentation, but why have the same constraint in the rest of the code, where comments are rather for the developers reading the code?
strlen(plasma_photon_entry->aux_address), plasma_photon_entry); | ||
|
||
/* Add photon_db_client_id -> plasma_manager ip:port association to state.*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state.*/
-> state. */
@@ -10,6 +10,15 @@ global_scheduler_policy_state *init_global_scheduler_policy(void) { | |||
global_scheduler_policy_state *policy_state = | |||
malloc(sizeof(global_scheduler_policy_state)); | |||
policy_state->round_robin_index = 0; | |||
|
|||
int num_weight_elem = sizeof(policy_state->resource_attribute_weight)/sizeof(double); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spaces around /
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also in = 1.0/num_weight_elem;
|
||
for (i = policy_state->round_robin_index; | ||
!task_satisfied && num_retries >= 0; | ||
i = (i+1) % utarray_len(state->local_schedulers)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(i + 1)
src/common/task.h
Outdated
MAX_RESOURCE_INDEX | ||
} resource_vector_index; | ||
|
||
#define DEFAULT_NUM_CPUS 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we should have defaults. Maybe the arguments should be required to be passed in to the local scheduler?
src/photon/photon_scheduler.c
Outdated
* so we set the pointer to NULL so it is not used. */ | ||
worker->task_in_progress = NULL; | ||
} | ||
if (worker->task_in_progress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't change the code, but maybe making if (worker->task_in_progress) {
into else if
would be a bit easier to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
CHECKM(task_spec != NULL, | ||
"task wait handler encounted a task with NULL spec"); | ||
#if (RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG) | ||
char buf[256]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using fixed size buffers seems a bit dangerous. We could use UT_string to get around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, fixed size string buffers are OK, as long as I use snprintf(), which is guaranteed to not write more than the specified size. This is for debugging only. I could remove this whole #if #endif block.
python/global_scheduler/test/test.py
Outdated
redis_address=redis_address) | ||
# Connect to the scheduler. | ||
self.photon_client = photon.PhotonClient(local_scheduler_name) | ||
self.plasma_store_p2 = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also put all the processes in one list?
src/common/task.c
Outdated
@@ -259,6 +265,10 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length) { | |||
return spec->arg_index++; | |||
} | |||
|
|||
void task_add_required_resource(task_spec *spec, int64_t resource_index, double value) { | |||
spec->required_resources[resource_index] = value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably do some error checking on value
(e.g., PyFloat_AsDouble
can fail and return -1). Also, are there required resources that wouldn't have an integer value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checking should probably be in the Python code then.
|
||
if (task_satisfied) { | ||
/* Update next index to try and assign the task. */ | ||
policy_state->round_robin_index = i; /* i was advanced. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a big deal, but if the above goes to the last retry (num_retries = -1
) and the task is satisfied, I think round_robin_index
will be set to the same index as the one the task is assigned to, instead of the one after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm pretty sure that i is always advanced, as long as we take at least one pass through the body of the for loop.
int num_retries = NUM_RETRIES; | ||
bool task_satisfied = false; | ||
|
||
for (i = policy_state->round_robin_index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ls->info.static_resources
doesn't change between iterations, right? Can we limit the number of rounds to the minimum of the number of local schedulers and NUM_RETRIES
?
…apacity (until next photon heartbeat)
… into hetergen-sched-rebase
python/ray/services.py
Outdated
@@ -18,6 +18,7 @@ | |||
import photon | |||
import plasma | |||
import global_scheduler | |||
from IPython.utils.sysinfo import num_cpus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not import this. We can't assume that people have IPython.
Note, this error message is out of date. We should fix it before merging.
|
/** Subtract task's resource from the cached dynamic resource capacity for | ||
* this local scheduler. This will be overwritten on the next heartbeat. */ | ||
local_scheduler->info.dynamic_resources[i] = | ||
MAX(0, local_scheduler->info.dynamic_resources[i] - |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't take a MAX. It should be allowed to go negative (otherwise, once all nodes hit 0 the global scheduler won't know how to distinguish between them even if way more tasks have been assigned to one of them).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried this may have unexpected effects on the score. Just letting it be negative doesn't have the effect you'd want on the score. E.g. dyn_cpu_node1 == -2, dyn_cpu_node2 = -1 and requested cpu=1. Node1 would cause less negative delta in the score than node2, and we'd want it to be reversed. dynfrac_cpu_node1 = -0.5 and dynfrac_cpu_node2 = -1. So node2 appears worse, while it should be better.
handle_task_round_robin(state, policy_state, task); | ||
return; | ||
} | ||
/* This node satisfies the hard capacity constraint. Calculate its score. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the logic about computing a "score" should be separated out as a different function so we can easily change the computation of the score without changing how the scheduling policy uses the score (or so we can use the score in another scheduling policy).
python/global_scheduler/test/test.py
Outdated
self.p3.kill() | ||
self.p4.kill() | ||
# kill photons, plasma managers, and plasma stores. | ||
map(subprocess.Popen.kill, self.local_scheduler_pids) #kill photon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add spaces before # at the end of line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean after?
python/photon/photon_services.py
Outdated
@@ -71,6 +73,21 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, | |||
command += ["-r", redis_address] | |||
if plasma_address is not None: | |||
command += ["-a", plasma_address] | |||
# We want to be able to support indepdently setting capacity for each of the supported resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
independently
src/common/task.h
Outdated
* @param value Value for the resource. This can be a quantity of resources | ||
* this task needs for example or it can be a value for an | ||
* attribute this task requires. | ||
* @return How many of this resource the task needs to execute. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace by
@return Void.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
documentation updated.
@@ -409,11 +415,22 @@ def start_ray_processes(address_info=None, | |||
start a global scheduler process. | |||
include_redis (bool): If include_redis is True, then start a Redis server | |||
process. | |||
num_cpus: A list of length num_local_schedulers containing the number of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're probably going to need to pass in more resources in the future. It may be a good idea to use a Python namedtuple to define a ResourceList
type, so that we can pass in a single argument for all the resources, and not have to remember the ordering of such an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll generalize the frontend in a separate PR. The initial plan was to just add support for the GPUs in the frontend. We made the backend general already. It'll be easy to make the frontend general when we get more use cases.
} else { | ||
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) { | ||
task_spec_add_required_resource(self->spec, i, | ||
i == CPU_RESOURCE_INDEX ? 1.0 : 0.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code isn't very generalizable to other resources. I would either leave a comment explaining that these are the default values for these particular indices, or make a static double[]
containing the default values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it not generalizable? The purpose of this code is to just set the number of requested cores to one and leave everything else zero. This is pretty general, IMHO. It does assume that the default is zero for everything else.
src/common/task.h
Outdated
* attribute this task requires. | ||
* @return How many of this resource the task needs to execute. | ||
*/ | ||
void task_spec_add_required_resource(task_spec *spec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this task_spec_set_required_resource
(and then get
for the one below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I like the symmetry, will do as a separate commit (global replace)
src/photon/photon_scheduler.c
Outdated
free_task(worker->task_in_progress); | ||
worker->task_in_progress = NULL; | ||
task *task_in_progress = worker->task_in_progress; | ||
task_spec *spec = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this line into the if-block below? This is just personal preference, but I always feel that ternary operators are harder to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love ternary operators. In fact I miss them in python dearly. But sure, after multiple rounds of iterations on this code, it no longer makes sense to have this spec outside of the if that follows.
I took a full pass through this whole case statement and minimized it.
src/photon/photon_scheduler.c
Outdated
} | ||
print_resource_info(state, spec); | ||
/* If we're connected to Redis, update tables. */ | ||
if (state->db != NULL && task_in_progress != NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the task_in_progress != NULL
check.
src/photon/photon_scheduler.c
Outdated
* task_in_progress, | ||
* so we set the pointer to NULL so it is not used. */ | ||
worker->task_in_progress = NULL; | ||
} else if (worker->task_in_progress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this just else
.
def f(n): | ||
time.sleep(n) | ||
|
||
start_time = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document these blocks? Also, set a variable equal to 0.5 instead of referencing it by value.
@@ -7,6 +7,9 @@ | |||
/* The duration between local scheduler heartbeats. */ | |||
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100 | |||
|
|||
#define DEFAULT_NUM_CPUS INT16_MAX |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the number of cores, instead of setting this to a static value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been discussed and the conclusion is to set default number of cores to infinity if not specified. This makes the behavior backward compatible.
In most cases, we start photon from python, and we automatically figure out the number of hw threads before spawning the photon process.
The expectation is for the resources to be specified when the local scheduler is started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to use this as the default if nothing is passed in.
@@ -317,6 +319,20 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { | |||
} | |||
} | |||
utarray_free(val_repr_ptrs); | |||
/* Set the resource vector of the task. */ | |||
if (resource_vector != NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should be checking that the resource values specified by the caller are above some minimum value. If we use 1.0 as the default number of CPUs instead of the minimum, we could specify a task as using 0 CPUs, and then we won't track the dynamic resources properly in the schedulers. Could lead to unfair task assignments, where one task is starved just because it specified a more conservative resource requirement, whereas another task specifies 0 CPUs but actually uses n.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could check if requested CPU is < 1, but I know for a fact that in the Google cluster it is entirely possible to request fractional cores. The workload may be entirely memory bound, for instance, in which case it doesn't need the whole core to be dedicated to it. Fairness of these allocations would have to be a feature we add on the backend when we actually enable attribution of tasks (and , therefore, resources) to users (or drivers)
ray.worker.global_worker.run_function_on_all_workers(lambda worker_info: sys.path.append(worker_info["counter"])) | ||
@ray.remote(num_cpus=0) | ||
def get_worker_id(): | ||
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
answer: to make sure tasks are scheduled on different workers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure the tasks all run on different workers.
@@ -7,6 +7,9 @@ | |||
/* The duration between local scheduler heartbeats. */ | |||
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100 | |||
|
|||
#define DEFAULT_NUM_CPUS INT16_MAX |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been discussed and the conclusion is to set default number of cores to infinity if not specified. This makes the behavior backward compatible.
In most cases, we start photon from python, and we automatically figure out the number of hw threads before spawning the photon process.
The expectation is for the resources to be specified when the local scheduler is started.
@@ -7,6 +7,9 @@ | |||
/* The duration between local scheduler heartbeats. */ | |||
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100 | |||
|
|||
#define DEFAULT_NUM_CPUS INT16_MAX | |||
#define DEFAULT_NUM_GPUS 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, the default was set to be backward compatible and the expectation is that the local scheduler is started with resource config specified. In the long run, we envision that the local scheduler will be reading a config file to get these values.
python/global_scheduler/test/test.py
Outdated
self.p3.kill() | ||
self.p4.kill() | ||
# kill photons, plasma managers, and plasma stores. | ||
map(subprocess.Popen.kill, self.local_scheduler_pids) #kill photon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean after?
@@ -409,11 +415,22 @@ def start_ray_processes(address_info=None, | |||
start a global scheduler process. | |||
include_redis (bool): If include_redis is True, then start a Redis server | |||
process. | |||
num_cpus: A list of length num_local_schedulers containing the number of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll generalize the frontend in a separate PR. The initial plan was to just add support for the GPUs in the frontend. We made the backend general already. It'll be easy to make the frontend general when we get more use cases.
@@ -317,6 +319,20 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { | |||
} | |||
} | |||
utarray_free(val_repr_ptrs); | |||
/* Set the resource vector of the task. */ | |||
if (resource_vector != NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could check if requested CPU is < 1, but I know for a fact that in the Google cluster it is entirely possible to request fractional cores. The workload may be entirely memory bound, for instance, in which case it doesn't need the whole core to be dedicated to it. Fairness of these allocations would have to be a feature we add on the backend when we actually enable attribution of tasks (and , therefore, resources) to users (or drivers)
src/common/task.h
Outdated
* @param value Value for the resource. This can be a quantity of resources | ||
* this task needs for example or it can be a value for an | ||
* attribute this task requires. | ||
* @return How many of this resource the task needs to execute. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
documentation updated.
src/common/task.h
Outdated
* attribute this task requires. | ||
* @return How many of this resource the task needs to execute. | ||
*/ | ||
void task_spec_add_required_resource(task_spec *spec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I like the symmetry, will do as a separate commit (global replace)
/** Subtract task's resource from the cached dynamic resource capacity for | ||
* this local scheduler. This will be overwritten on the next heartbeat. */ | ||
local_scheduler->info.dynamic_resources[i] = | ||
MAX(0, local_scheduler->info.dynamic_resources[i] - |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried this may have unexpected effects on the score. Just letting it be negative doesn't have the effect you'd want on the score. E.g. dyn_cpu_node1 == -2, dyn_cpu_node2 = -1 and requested cpu=1. Node1 would cause less negative delta in the score than node2, and we'd want it to be reversed. dynfrac_cpu_node1 = -0.5 and dynfrac_cpu_node2 = -1. So node2 appears worse, while it should be better.
|
||
if (task_satisfied) { | ||
/* Update next index to try and assign the task. */ | ||
policy_state->round_robin_index = i; /* i was advanced. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm pretty sure that i is always advanced, as long as we take at least one pass through the body of the for loop.
src/photon/photon_scheduler.c
Outdated
free_task(worker->task_in_progress); | ||
worker->task_in_progress = NULL; | ||
task *task_in_progress = worker->task_in_progress; | ||
task_spec *spec = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love ternary operators. In fact I miss them in python dearly. But sure, after multiple rounds of iterations on this code, it no longer makes sense to have this spec outside of the if that follows.
I took a full pass through this whole case statement and minimized it.
I ran into the following problem. import ray
ray.worker._init(num_local_schedulers=2, num_workers=10, start_ray_local=True)
@ray.remote
def f():
return ray.worker.global_worker.plasma_client.store_socket_name
set(ray.get([f.remote() for _ in range(1000)])) The last line prints |
b3008ce
to
a1272c1
Compare
…hetergen-sched-rebase