Skip to content

General attribute-based heterogeneity support with hard and soft constraints #248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Feb 9, 2017

Conversation

atumanov
Copy link
Contributor

@atumanov atumanov commented Feb 6, 2017

  • modifications to the global and local scheduler to provide attribute-based resource accounting
  • support for hard capacity constraints
  • support for soft dynamic resource capacity constraints, including transfer-cost awareness, in the global scheduler

redis_address=redis_address)
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(local_scheduler_name)
self.plasma_store_p2 = []
Copy link
Contributor

@pcmoritz pcmoritz Feb 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name is a little unusual, want to rename to self.plasma_stores? (similar for plasma_store_p3 and 4)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also put all the processes in one list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, actually, I'd rather keep them separate, because I'd prefer to kill all photons, followed by all plasma managers, followed by all plasma stores.

redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so")
assert os.path.isfile(redis_path)
assert os.path.isfile(redis_module)
node_ip_address = "127.0.0.1"
#redis_port = 6379 #default Redis port
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

plasma_store_name, p2 = plasma.start_plasma_store()
self.plasma_store_p2.append(p2)
# Start the Plasma manager.
# Assumption: Plasma manager name and port are randomly generated by the plasma module.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can get rid of "Assumption" here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to check to make sure this is indeed the case, because we need different (name, port) pairs generated for the multinode test case. If the generated (name, port) pair were deterministic, I would've had to make changes inside the plasma module. I think explicitly stating this post-condition is useful.

@@ -39,10 +39,12 @@ typedef struct {
UT_hash_handle hh;
} scheduler_object_info;

/* plasma_manager ip:port -> photon_db_client_id association entry. */
Copy link
Contributor

@pcmoritz pcmoritz Feb 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the documentation into the same form as everywhere else, i.e. full sentences ("Association of foo to bar" is ok), doxygen style (i.e. /** instead of /*) and for members preceding the member and not following it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, maybe we should add doxygen generation to the build, so it explicitly fails the build if the comments are not doxygen compliant. Since this is a header file (for which we expect to produce doxygen documentation), I agree with your comment placement suggestion.

db_client_id = self.get_plasma_manager_id()
assert(db_client_id != None)
assert(db_client_id.startswith(b"CL:"))
db_client_id = db_client_id[len(b"CL:"):] # Remove the CL: prefix.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

white space

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave all white space-related cleanup until the very end and run all the code through the linter that formats it for me.

self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)

self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 2*NUM_CLUSTER_NODES+1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

white space

num_return_vals = [0, 1, 2, 3, 5, 10]
# There should not be anything else in Redis yet.
self.assertEqual(len(self.redis_client.keys("*")), 3)
self.assertEqual(len(self.redis_client.keys("*")), 2*NUM_CLUSTER_NODES+1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 * NUM_CLUSTER_NODES + 1

plasma_manager_name=plasma_manager_name,
plasma_address=plasma_address,
redis_address=redis_address,
static_resource_list=[str(DEFAULT_NUM_CPUS), str(DEFAULT_NUM_GPUS)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to use ints here and turn them into strings inside of photon.start_local_scheduler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

@@ -262,7 +262,8 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False):

def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
plasma_manager_name, worker_path, plasma_address=None,
cleanup=True, redirect_output=False):
cleanup=True, redirect_output=False,
static_resource_list=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document this argument

strlen(plasma_photon_entry->aux_address), plasma_photon_entry);

/* Add photon_db_client_id -> plasma_manager ip:port association to state.*/
HASH_ADD(photon_plasma_hh, state->photon_plasma_map,
photon_db_client_id, /* Key is the field name of entry struct. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been always putting comments on their own lines and not on separate lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, but this is a rather inconvenient stylistic choice. When a comment is on the same line as the code it comments on, there's no ambiguity which line of code it corresponds to. When the comment is on its own line, it could correspond to the line before or the line after. It's only by convention that you know it's the latter.
It makes sense for header files, when we expect doxygen to generate API documentation, but why have the same constraint in the rest of the code, where comments are rather for the developers reading the code?

strlen(plasma_photon_entry->aux_address), plasma_photon_entry);

/* Add photon_db_client_id -> plasma_manager ip:port association to state.*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.*/ -> state. */

@@ -10,6 +10,15 @@ global_scheduler_policy_state *init_global_scheduler_policy(void) {
global_scheduler_policy_state *policy_state =
malloc(sizeof(global_scheduler_policy_state));
policy_state->round_robin_index = 0;

int num_weight_elem = sizeof(policy_state->resource_attribute_weight)/sizeof(double);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaces around /

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in = 1.0/num_weight_elem;


for (i = policy_state->round_robin_index;
!task_satisfied && num_retries >= 0;
i = (i+1) % utarray_len(state->local_schedulers)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(i + 1)

MAX_RESOURCE_INDEX
} resource_vector_index;

#define DEFAULT_NUM_CPUS 16
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should have defaults. Maybe the arguments should be required to be passed in to the local scheduler?

* so we set the pointer to NULL so it is not used. */
worker->task_in_progress = NULL;
}
if (worker->task_in_progress) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't change the code, but maybe making if (worker->task_in_progress) { into else if would be a bit easier to understand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

CHECKM(task_spec != NULL,
"task wait handler encounted a task with NULL spec");
#if (RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG)
char buf[256];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using fixed size buffers seems a bit dangerous. We could use UT_string to get around this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, fixed size string buffers are OK, as long as I use snprintf(), which is guaranteed to not write more than the specified size. This is for debugging only. I could remove this whole #if #endif block.

@atumanov atumanov self-assigned this Feb 7, 2017
redis_address=redis_address)
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(local_scheduler_name)
self.plasma_store_p2 = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also put all the processes in one list?

@@ -259,6 +265,10 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length) {
return spec->arg_index++;
}

void task_add_required_resource(task_spec *spec, int64_t resource_index, double value) {
spec->required_resources[resource_index] = value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably do some error checking on value (e.g., PyFloat_AsDouble can fail and return -1). Also, are there required resources that wouldn't have an integer value?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checking should probably be in the Python code then.


if (task_satisfied) {
/* Update next index to try and assign the task. */
policy_state->round_robin_index = i; /* i was advanced. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a big deal, but if the above goes to the last retry (num_retries = -1) and the task is satisfied, I think round_robin_index will be set to the same index as the one the task is assigned to, instead of the one after.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm pretty sure that i is always advanced, as long as we take at least one pass through the body of the for loop.

int num_retries = NUM_RETRIES;
bool task_satisfied = false;

for (i = policy_state->round_robin_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ls->info.static_resources doesn't change between iterations, right? Can we limit the number of rounds to the minimum of the number of local schedulers and NUM_RETRIES?

@@ -18,6 +18,7 @@
import photon
import plasma
import global_scheduler
from IPython.utils.sysinfo import num_cpus
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not import this. We can't assume that people have IPython.

@robertnishihara
Copy link
Collaborator

Note, this error message is out of date. We should fix it before merging.

AssertionError: The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'.

/** Subtract task's resource from the cached dynamic resource capacity for
* this local scheduler. This will be overwritten on the next heartbeat. */
local_scheduler->info.dynamic_resources[i] =
MAX(0, local_scheduler->info.dynamic_resources[i] -
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't take a MAX. It should be allowed to go negative (otherwise, once all nodes hit 0 the global scheduler won't know how to distinguish between them even if way more tasks have been assigned to one of them).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried this may have unexpected effects on the score. Just letting it be negative doesn't have the effect you'd want on the score. E.g. dyn_cpu_node1 == -2, dyn_cpu_node2 = -1 and requested cpu=1. Node1 would cause less negative delta in the score than node2, and we'd want it to be reversed. dynfrac_cpu_node1 = -0.5 and dynfrac_cpu_node2 = -1. So node2 appears worse, while it should be better.

handle_task_round_robin(state, policy_state, task);
return;
}
/* This node satisfies the hard capacity constraint. Calculate its score. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the logic about computing a "score" should be separated out as a different function so we can easily change the computation of the score without changing how the scheduling policy uses the score (or so we can use the score in another scheduling policy).

self.p3.kill()
self.p4.kill()
# kill photons, plasma managers, and plasma stores.
map(subprocess.Popen.kill, self.local_scheduler_pids) #kill photon
Copy link
Contributor

@pcmoritz pcmoritz Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add spaces before # at the end of line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean after?

@@ -71,6 +73,21 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
command += ["-r", redis_address]
if plasma_address is not None:
command += ["-a", plasma_address]
# We want to be able to support indepdently setting capacity for each of the supported resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

independently

* @param value Value for the resource. This can be a quantity of resources
* this task needs for example or it can be a value for an
* attribute this task requires.
* @return How many of this resource the task needs to execute.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace by

@return Void.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation updated.

@@ -409,11 +415,22 @@ def start_ray_processes(address_info=None,
start a global scheduler process.
include_redis (bool): If include_redis is True, then start a Redis server
process.
num_cpus: A list of length num_local_schedulers containing the number of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're probably going to need to pass in more resources in the future. It may be a good idea to use a Python namedtuple to define a ResourceList type, so that we can pass in a single argument for all the resources, and not have to remember the ordering of such an argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll generalize the frontend in a separate PR. The initial plan was to just add support for the GPUs in the frontend. We made the backend general already. It'll be easy to make the frontend general when we get more use cases.

} else {
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
task_spec_add_required_resource(self->spec, i,
i == CPU_RESOURCE_INDEX ? 1.0 : 0.0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code isn't very generalizable to other resources. I would either leave a comment explaining that these are the default values for these particular indices, or make a static double[] containing the default values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it not generalizable? The purpose of this code is to just set the number of requested cores to one and leave everything else zero. This is pretty general, IMHO. It does assume that the default is zero for everything else.

* attribute this task requires.
* @return How many of this resource the task needs to execute.
*/
void task_spec_add_required_resource(task_spec *spec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this task_spec_set_required_resource (and then get for the one below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I like the symmetry, will do as a separate commit (global replace)

free_task(worker->task_in_progress);
worker->task_in_progress = NULL;
task *task_in_progress = worker->task_in_progress;
task_spec *spec =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this line into the if-block below? This is just personal preference, but I always feel that ternary operators are harder to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love ternary operators. In fact I miss them in python dearly. But sure, after multiple rounds of iterations on this code, it no longer makes sense to have this spec outside of the if that follows.

I took a full pass through this whole case statement and minimized it.

}
print_resource_info(state, spec);
/* If we're connected to Redis, update tables. */
if (state->db != NULL && task_in_progress != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the task_in_progress != NULL check.

* task_in_progress,
* so we set the pointer to NULL so it is not used. */
worker->task_in_progress = NULL;
} else if (worker->task_in_progress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this just else.

def f(n):
time.sleep(n)

start_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document these blocks? Also, set a variable equal to 0.5 instead of referencing it by value.

@@ -7,6 +7,9 @@
/* The duration between local scheduler heartbeats. */
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100

#define DEFAULT_NUM_CPUS INT16_MAX
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the number of cores, instead of setting this to a static value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been discussed and the conclusion is to set default number of cores to infinity if not specified. This makes the behavior backward compatible.
In most cases, we start photon from python, and we automatically figure out the number of hw threads before spawning the photon process.
The expectation is for the resources to be specified when the local scheduler is started.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to use this as the default if nothing is passed in.

@@ -317,6 +319,20 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
}
}
utarray_free(val_repr_ptrs);
/* Set the resource vector of the task. */
if (resource_vector != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be checking that the resource values specified by the caller are above some minimum value. If we use 1.0 as the default number of CPUs instead of the minimum, we could specify a task as using 0 CPUs, and then we won't track the dynamic resources properly in the schedulers. Could lead to unfair task assignments, where one task is starved just because it specified a more conservative resource requirement, whereas another task specifies 0 CPUs but actually uses n.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could check if requested CPU is < 1, but I know for a fact that in the Google cluster it is entirely possible to request fractional cores. The workload may be entirely memory bound, for instance, in which case it doesn't need the whole core to be dedicated to it. Fairness of these allocations would have to be a feature we add on the backend when we actually enable attribution of tasks (and , therefore, resources) to users (or drivers)

ray.worker.global_worker.run_function_on_all_workers(lambda worker_info: sys.path.append(worker_info["counter"]))
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the sleep here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answer: to make sure tasks are scheduled on different workers

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure the tasks all run on different workers.

@@ -7,6 +7,9 @@
/* The duration between local scheduler heartbeats. */
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100

#define DEFAULT_NUM_CPUS INT16_MAX
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been discussed and the conclusion is to set default number of cores to infinity if not specified. This makes the behavior backward compatible.
In most cases, we start photon from python, and we automatically figure out the number of hw threads before spawning the photon process.
The expectation is for the resources to be specified when the local scheduler is started.

@@ -7,6 +7,9 @@
/* The duration between local scheduler heartbeats. */
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100

#define DEFAULT_NUM_CPUS INT16_MAX
#define DEFAULT_NUM_GPUS 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, the default was set to be backward compatible and the expectation is that the local scheduler is started with resource config specified. In the long run, we envision that the local scheduler will be reading a config file to get these values.

self.p3.kill()
self.p4.kill()
# kill photons, plasma managers, and plasma stores.
map(subprocess.Popen.kill, self.local_scheduler_pids) #kill photon
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean after?

@@ -409,11 +415,22 @@ def start_ray_processes(address_info=None,
start a global scheduler process.
include_redis (bool): If include_redis is True, then start a Redis server
process.
num_cpus: A list of length num_local_schedulers containing the number of
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll generalize the frontend in a separate PR. The initial plan was to just add support for the GPUs in the frontend. We made the backend general already. It'll be easy to make the frontend general when we get more use cases.

@@ -317,6 +319,20 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
}
}
utarray_free(val_repr_ptrs);
/* Set the resource vector of the task. */
if (resource_vector != NULL) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could check if requested CPU is < 1, but I know for a fact that in the Google cluster it is entirely possible to request fractional cores. The workload may be entirely memory bound, for instance, in which case it doesn't need the whole core to be dedicated to it. Fairness of these allocations would have to be a feature we add on the backend when we actually enable attribution of tasks (and , therefore, resources) to users (or drivers)

* @param value Value for the resource. This can be a quantity of resources
* this task needs for example or it can be a value for an
* attribute this task requires.
* @return How many of this resource the task needs to execute.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation updated.

* attribute this task requires.
* @return How many of this resource the task needs to execute.
*/
void task_spec_add_required_resource(task_spec *spec,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I like the symmetry, will do as a separate commit (global replace)

/** Subtract task's resource from the cached dynamic resource capacity for
* this local scheduler. This will be overwritten on the next heartbeat. */
local_scheduler->info.dynamic_resources[i] =
MAX(0, local_scheduler->info.dynamic_resources[i] -
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried this may have unexpected effects on the score. Just letting it be negative doesn't have the effect you'd want on the score. E.g. dyn_cpu_node1 == -2, dyn_cpu_node2 = -1 and requested cpu=1. Node1 would cause less negative delta in the score than node2, and we'd want it to be reversed. dynfrac_cpu_node1 = -0.5 and dynfrac_cpu_node2 = -1. So node2 appears worse, while it should be better.


if (task_satisfied) {
/* Update next index to try and assign the task. */
policy_state->round_robin_index = i; /* i was advanced. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm pretty sure that i is always advanced, as long as we take at least one pass through the body of the for loop.

free_task(worker->task_in_progress);
worker->task_in_progress = NULL;
task *task_in_progress = worker->task_in_progress;
task_spec *spec =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love ternary operators. In fact I miss them in python dearly. But sure, after multiple rounds of iterations on this code, it no longer makes sense to have this spec outside of the if that follows.

I took a full pass through this whole case statement and minimized it.

@robertnishihara
Copy link
Collaborator

I ran into the following problem.

import ray
ray.worker._init(num_local_schedulers=2, num_workers=10, start_ray_local=True)

@ray.remote
def f():
  return ray.worker.global_worker.plasma_client.store_socket_name

set(ray.get([f.remote() for _ in range(1000)]))

The last line prints {'/tmp/plasma_store13623057'}, which suggests that tasks are all being assigned locally.

@robertnishihara robertnishihara merged commit dfb6107 into ray-project:master Feb 9, 2017
@robertnishihara robertnishihara deleted the hetergen-sched-rebase branch February 9, 2017 09:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants