Skip to content

Conversation

stephanie-wang
Copy link
Contributor

No description provided.

@@ -131,7 +131,7 @@ def testObjectTableAddAndRemove(self):
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id2")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set())
# Remove a manager from an empty set, and make sure we still have an empty set.
# Remove a manager from an empty set, and make sure we now have an empty set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"still" was right here I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the API so that it returns None for no entry at all and an empty set for an empty entry.

bool object_is_local(local_scheduler_state *state, object_id object_id) {
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
available_object *entry;
HASH_FIND(handle, algorithm_state->local_objects, &object_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider state->algorithm_state and getting rid of the first line

args.append(multiple_dependency.remote(i, *args[i:i+num_args]))

# Get each value to force each task to finish. After some number of gets,
# old values should be evicted. Get each value again to force
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move the second sentence down where the getting again is done?

arg[0] = i
return arg

# Launch num_iterations instances of the remote task, each dependent on the
Copy link
Contributor

@pcmoritz pcmoritz Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"each dependent on the one before it"

It should be "each dependent on the first task", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yeah, I wrote the test case incorrectly.

args.append(foo.remote(i, size))

# Get each value to force each task to finish. After some number of gets,
# old values should be evicted. Get each value again to force
Copy link
Contributor

@pcmoritz pcmoritz Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move "Get each value again..." down to where it is done?

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good! I left some comments/questions to discuss.

@@ -446,8 +449,16 @@ def get_object(self, objectid):
Args:
objectid (object_id.ObjectID): The object ID of the value to retrieve.
"""
self.plasma_client.fetch([objectid.id()])
buff = self.plasma_client.get(objectid.id())
buff = self.plasma_client.get_local(objectid.id())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this by making plasma_client.get take a timeout (I can do this).

This will serialize all reconstruction. If I call get on several object IDs. We won't start reconstructing the second one until the first one has finished reconstructing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think reconstruction has to be serialized. We can call reconstruction on a bunch of objects serially, but the local scheduler's event loop should be able to process them asynchronously.

self.photon_client.reconstruct_object(objectid)
self.plasma_client.fetch([objectid.id()])
self.plasma_client.wait([objectid.id()], timeout=GET_TIMEOUT)
buff = self.plasma_client.get_local(objectid.id())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebasing this on top of some optimizations Philipp did will require a little thought. We no longer call plasma_client.get directly from worker.py, we're now calling numbuf.retrieve_list, which internally calls plasma_client.get. So that method has to change to be non-blocking or to take a timeout.

}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we were calling the done callback only when the manager vector was non-empty, and now this is calling it even when the manager vector is empty. So this changes the API of the methods that use this callback, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? The original code looks like it would call the callback even if manager_count = 0.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think you're right about that!

* check and remove the if-check and exit. */
/* CHECKM(task != NULL,
"No task information found for object during reconstruction"); */
if (task == NULL) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should this be fatal? If we need to reconstruct something that was created by a put and we can't then the program will hang anyway, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reconstruct puts, but the rest of the code that's needed for that isn't in place yet. I was hoping to do those in another PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still occur in some cases (e.g., when the put happened on the driver).

I'd suggest making it fatal so that we know when it happens (the alternative is deadlock, right?).

@@ -172,8 +172,7 @@ bool require_space(eviction_state *eviction_state,
num_bytes_evicted = choose_objects_to_evict(
eviction_state, plasma_store_info, space_to_free, num_objects_to_evict,
objects_to_evict);
printf("Evicted %" PRId64 " bytes.\n", num_bytes_evicted);
LOG_INFO(
LOG_DEBUG(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this LOG_INFO intentionally since it doesn't happen frequently enough to bother people (hopefully) and since this information will help us understand the problem any time the system hangs.

@@ -129,6 +129,10 @@ def create(self, object_id, size, metadata=None):
metadata (buffer): An optional buffer encoding whatever metadata the user
wishes to encode.

Returns:
A buffer for the object created. If the object wasn't able to be created
because it already exists locally, returns None.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent this line by two spaces.

@@ -154,6 +158,25 @@ def get(self, object_id):
buff = libplasma.get(self.conn, object_id)[0]
return PlasmaBuffer(buff, object_id, self)

def get_local(self, object_id):
"""Create a buffer from an object in the local PlasmaStore based on object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first sentence of the doc string is supposed to fit in one line.


Args:
object_id (str): A string used to identify an object.
Returns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a newline before Returns:

object_id (str): A string used to identify an object.
Returns:
A buffer of the locally available object, or None if the object isn't
available locally.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent this by two spaces.

@@ -54,6 +54,18 @@ def new_port():
def random_name():
return str(random.randint(0, 99999999))

def get_plasma_store_memory():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

@stephanie-wang stephanie-wang force-pushed the reconstruction branch 2 times, most recently from 1aee092 to 1fdcca3 Compare January 26, 2017 21:14
Returns:
A buffer for the object created. If the object wasn't able to be created
because it already exists locally, returns None.

Raises:
plasma_object_exists_error: This exception is raised if the object could
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exception still used? Throwing an exception feels more natural to me than returning None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes we're still using the exception. I think this comment is left over from when I was rebasing.

@@ -35,6 +35,10 @@
LOG_SPAN_START = 1
LOG_SPAN_END = 2

# When performing ray.get, wait 1 second before attemping to reconstruct and
# fetch the object again.
GET_TIMEOUT = 1000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to GET_TIMEOUT_MILLISECONDS

if val is not None:
index = unready_ids[object_id]
final_results[index] = (object_id, val)
del unready_ids[object_id]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly cleaner I think to use unready_ids.pop(object_id)

# This would be a natural place to issue a command to reconstruct some of
# the objects.
for unready_id in unready_ids:
self.photon_client.reconstruct_object(unready_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to move the call to reconstruct to the end of the while loop so as to not flood the local scheduler with requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how moving the call to the end of the loop will help?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it will cause the worker to wait for GET_TIMEOUT before sending the reconstruct commands to the local scheduler, so if the get is fulfilled before GET_TIMEOUT passes, then no reconstruct commands will be sent. At least that's what I meant.

}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think you're right about that!

@ray.remote
def foo(i, size):
array = np.zeros(size)
array[0] = i
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably unnecessary, but I'd be in favor of setting array[-1] = i as well to check that we're getting the whole object.

# Define the size of one task's return argument so that the combined sum of
# all objects' sizes is twice the plasma store's allotted memory.
num_objects = 1000
size = int(self.plasma_store_memory * 2 / (num_objects * 8))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is in stress_tests.py, can we make it self.plasma_store_memory * 10 instead of * 2? (same with the other tests)

for i in range(num_iterations):
value = ray.get(args[i])
self.assertEqual(value[0], i)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we either add another pass through in the reverse direction (in addition to what you have)? E.g.,

for i in range(num_iterations)[::-1]:
  value = ray.get(args[i])
  self.assertEqual(value[0], i)

or randomly choose 100 values and reconstruction them, e.g.,

for _ in range(100):
  i  = np.random.randint(num_objects)
  value = ray.get(args[i])
  self.assertEqual(value[0], i)

Actually, maybe add both?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And do something similar in the next test?

def testSingleNodeRecursive(self):
# Define the size of one task's return argument so that the combined sum of
# all objects' sizes is twice the plasma store's allotted memory.
num_iterations = 1000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you call this num_objects to be consistent with the previous test?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the next test.

arg = no_dependency_task.remote(size)
args.append(arg)
for i in range(num_iterations):
args.append(multiple_dependency.remote(i, *args[i:i+num_args]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i+num_args -> (i + num_args)

@@ -182,6 +182,9 @@ task_spec *alloc_nil_task_spec(task_id task_id) {
}

int64_t task_spec_size(task_spec *spec) {
if (spec == NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be necessary, in which case is it called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks for catching this! I was originally allocating an empty task using this, but turned out to not need it.

@@ -315,7 +318,10 @@ struct task_impl {
task *alloc_task(task_spec *spec,
scheduling_state state,
db_client_id local_scheduler_id) {
int64_t size = sizeof(task) - sizeof(task_spec) + task_spec_size(spec);
int64_t size = sizeof(task);
if (spec != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@robertnishihara
Copy link
Collaborator

One bug I encountered while testing this, which is not an issue with this PR and should be fixed separately (but is interesting nonetheless), is the following.

This is mostly an issue with the fact that functions are currently identified by their module + name, so functions with the same name override each other.

import ray
import numpy as np

ray.init()

@ray.remote
def f(n):
  return np.zeros(n)

# Some of these objects are evicted.
l = [f.remote(100000000) for _ in range(1000)]

@ray.remote
def f(x, y):
  return x + y

# Do a tree reduction.
while len(l) > 1:
  l.append(f.remote(l.pop(0), l.pop(0)))

ray.get(l)

When some of the original tasks that created the list l are re-executed, the new definition of the function f will be used, and you will get an error like this

Remote function __main__.f failed with:

Traceback (most recent call last):
TypeError: f() missing 1 required positional argument: 'y'

plasma_addresses = []
objstore_memory = (self.plasma_store_memory // self.num_local_schedulers)
for i in range(self.num_local_schedulers):
plasma_addresses.append(
Copy link
Collaborator

@robertnishihara robertnishihara Feb 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be indented only 2 spaces (plasma_addresses.append( that is)

@@ -39,6 +39,10 @@
DRIVER_ID_LENGTH = 20
ERROR_ID_LENGTH = 20

# When performing ray.get, wait 1 second before attemping to reconstruct and
# fetch the object again.
GET_TIMEOUT = 1000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be GET_TIMEOUT_MILLISECONDS

"task_id",
*task_args[:3])
self.assertEqual(response, None)
# Check that the update did not happen.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't actually verify that the update didn't happen because the arguments passed in line 241 and in line 251 are the same. The update happens the first time so whether it happens or not the second time, the result will still be the same.

Maybe you want to do task_args[1] += 1 before line 249?

I may have misunderstood :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right.

@@ -581,6 +586,35 @@ int ResultTableAdd_RedisCommand(RedisModuleCtx *ctx,
return REDISMODULE_OK;
}

int ParseTaskState(RedisModuleString *state) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is much cleaner (and good catch with closing the keys in this file)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

@@ -821,6 +845,84 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx,
}

/**
* Update an entry in the task table. This does not update the task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documentation is inaccurate I believe :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, dangers of copy pasta...

scheduling_state state = task_state(task);
int test_state = update_data->test_state;

CHECKM(task != NULL, "NULL task passed to redis_task_table_update.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis_task_table_test_and_update

void *user_context) {
task_table_test_and_update_data *update_data =
malloc(sizeof(task_table_test_and_update_data));
update_data->task = task;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit strange that update_data and update_data->task will get freed at different times...

Is it possible that if we retry the call to Redis, the callback will get called twice and will try to free update_data->task twice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document here where update_data->task will be freed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a little unfortunate...I had to do it this way because of the way callback data is cleaned up (the failure callback calls free(callback_data->data)). Let me see how it would look if we kept the entire task in task_table_test_and_update_data instead of a pointer.


/* Data that is needed to test and set the task's scheduling state. */
typedef struct {
task *task;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we make task the last entry of the struct and make it NOT a pointer so that it gets freed along with the rest of the struct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could get around these issues in other ways, since we don't actually need the task_spec inside the task, we could just include the relevant fields directly in task_table_test_and_update_data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I think the second is probably the best way.

@stephanie-wang stephanie-wang changed the title [WIP] Reconstruction for evicted objects Reconstruction for evicted objects Feb 1, 2017
retry_info *retry,
task_table_get_callback done_callback,
void *user_context) {
task_table_test_and_update_data *update_data =
malloc(sizeof(task_table_test_and_update_data));
update_data->task = task;
memset(update_data, 0, sizeof(task_table_test_and_update_data));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memset is unnecessary, right?

@robertnishihara robertnishihara merged commit 241b539 into ray-project:master Feb 2, 2017
@robertnishihara robertnishihara deleted the reconstruction branch February 2, 2017 03:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants