-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Reconstruction for evicted objects #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reconstruction for evicted objects #181
Conversation
src/common/redis_module/runtest.py
Outdated
@@ -131,7 +131,7 @@ def testObjectTableAddAndRemove(self): | |||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id2") | |||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") | |||
self.assertEqual(set(response), set()) | |||
# Remove a manager from an empty set, and make sure we still have an empty set. | |||
# Remove a manager from an empty set, and make sure we now have an empty set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"still" was right here I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the API so that it returns None
for no entry at all and an empty set for an empty entry.
src/photon/photon_algorithm.c
Outdated
bool object_is_local(local_scheduler_state *state, object_id object_id) { | ||
scheduling_algorithm_state *algorithm_state = state->algorithm_state; | ||
available_object *entry; | ||
HASH_FIND(handle, algorithm_state->local_objects, &object_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider state->algorithm_state and getting rid of the first line
test/stress_tests.py
Outdated
args.append(multiple_dependency.remote(i, *args[i:i+num_args])) | ||
|
||
# Get each value to force each task to finish. After some number of gets, | ||
# old values should be evicted. Get each value again to force |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move the second sentence down where the getting again is done?
test/stress_tests.py
Outdated
arg[0] = i | ||
return arg | ||
|
||
# Launch num_iterations instances of the remote task, each dependent on the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"each dependent on the one before it"
It should be "each dependent on the first task", right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yeah, I wrote the test case incorrectly.
test/stress_tests.py
Outdated
args.append(foo.remote(i, size)) | ||
|
||
# Get each value to force each task to finish. After some number of gets, | ||
# old values should be evicted. Get each value again to force |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move "Get each value again..." down to where it is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good! I left some comments/questions to discuss.
lib/python/ray/worker.py
Outdated
@@ -446,8 +449,16 @@ def get_object(self, objectid): | |||
Args: | |||
objectid (object_id.ObjectID): The object ID of the value to retrieve. | |||
""" | |||
self.plasma_client.fetch([objectid.id()]) | |||
buff = self.plasma_client.get(objectid.id()) | |||
buff = self.plasma_client.get_local(objectid.id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this by making plasma_client.get
take a timeout (I can do this).
This will serialize all reconstruction. If I call get on several object IDs. We won't start reconstructing the second one until the first one has finished reconstructing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think reconstruction has to be serialized. We can call reconstruction on a bunch of objects serially, but the local scheduler's event loop should be able to process them asynchronously.
lib/python/ray/worker.py
Outdated
self.photon_client.reconstruct_object(objectid) | ||
self.plasma_client.fetch([objectid.id()]) | ||
self.plasma_client.wait([objectid.id()], timeout=GET_TIMEOUT) | ||
buff = self.plasma_client.get_local(objectid.id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebasing this on top of some optimizations Philipp did will require a little thought. We no longer call plasma_client.get
directly from worker.py
, we're now calling numbuf.retrieve_list
, which internally calls plasma_client.get
. So that method has to change to be non-blocking or to take a timeout.
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we were calling the done callback only when the manager vector was non-empty, and now this is calling it even when the manager vector is empty. So this changes the API of the methods that use this callback, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that true? The original code looks like it would call the callback even if manager_count = 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I think you're right about that!
src/photon/photon_scheduler.c
Outdated
* check and remove the if-check and exit. */ | ||
/* CHECKM(task != NULL, | ||
"No task information found for object during reconstruction"); */ | ||
if (task == NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should this be fatal? If we need to reconstruct something that was created by a put and we can't then the program will hang anyway, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reconstruct puts, but the rest of the code that's needed for that isn't in place yet. I was hoping to do those in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will still occur in some cases (e.g., when the put happened on the driver).
I'd suggest making it fatal so that we know when it happens (the alternative is deadlock, right?).
src/plasma/eviction_policy.c
Outdated
@@ -172,8 +172,7 @@ bool require_space(eviction_state *eviction_state, | |||
num_bytes_evicted = choose_objects_to_evict( | |||
eviction_state, plasma_store_info, space_to_free, num_objects_to_evict, | |||
objects_to_evict); | |||
printf("Evicted %" PRId64 " bytes.\n", num_bytes_evicted); | |||
LOG_INFO( | |||
LOG_DEBUG( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this LOG_INFO intentionally since it doesn't happen frequently enough to bother people (hopefully) and since this information will help us understand the problem any time the system hangs.
src/plasma/plasma/plasma.py
Outdated
@@ -129,6 +129,10 @@ def create(self, object_id, size, metadata=None): | |||
metadata (buffer): An optional buffer encoding whatever metadata the user | |||
wishes to encode. | |||
|
|||
Returns: | |||
A buffer for the object created. If the object wasn't able to be created | |||
because it already exists locally, returns None. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent this line by two spaces.
src/plasma/plasma/plasma.py
Outdated
@@ -154,6 +158,25 @@ def get(self, object_id): | |||
buff = libplasma.get(self.conn, object_id)[0] | |||
return PlasmaBuffer(buff, object_id, self) | |||
|
|||
def get_local(self, object_id): | |||
"""Create a buffer from an object in the local PlasmaStore based on object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first sentence of the doc string is supposed to fit in one line.
src/plasma/plasma/plasma.py
Outdated
|
||
Args: | ||
object_id (str): A string used to identify an object. | ||
Returns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a newline before Returns:
src/plasma/plasma/plasma.py
Outdated
object_id (str): A string used to identify an object. | ||
Returns: | ||
A buffer of the locally available object, or None if the object isn't | ||
available locally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent this by two spaces.
lib/python/ray/services.py
Outdated
@@ -54,6 +54,18 @@ def new_port(): | |||
def random_name(): | |||
return str(random.randint(0, 99999999)) | |||
|
|||
def get_plasma_store_memory(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere?
1aee092
to
1fdcca3
Compare
python/plasma/plasma.py
Outdated
Returns: | ||
A buffer for the object created. If the object wasn't able to be created | ||
because it already exists locally, returns None. | ||
|
||
Raises: | ||
plasma_object_exists_error: This exception is raised if the object could |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this exception still used? Throwing an exception feels more natural to me than returning None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes we're still using the exception. I think this comment is left over from when I was rebasing.
python/ray/worker.py
Outdated
@@ -35,6 +35,10 @@ | |||
LOG_SPAN_START = 1 | |||
LOG_SPAN_END = 2 | |||
|
|||
# When performing ray.get, wait 1 second before attemping to reconstruct and | |||
# fetch the object again. | |||
GET_TIMEOUT = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to GET_TIMEOUT_MILLISECONDS
python/ray/worker.py
Outdated
if val is not None: | ||
index = unready_ids[object_id] | ||
final_results[index] = (object_id, val) | ||
del unready_ids[object_id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly cleaner I think to use unready_ids.pop(object_id)
# This would be a natural place to issue a command to reconstruct some of | ||
# the objects. | ||
for unready_id in unready_ids: | ||
self.photon_client.reconstruct_object(unready_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
want to move the call to reconstruct to the end of the while loop so as to not flood the local scheduler with requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how moving the call to the end of the loop will help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it will cause the worker to wait for GET_TIMEOUT before sending the reconstruct commands to the local scheduler, so if the get is fulfilled before GET_TIMEOUT passes, then no reconstruct commands will be sent. At least that's what I meant.
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I think you're right about that!
@ray.remote | ||
def foo(i, size): | ||
array = np.zeros(size) | ||
array[0] = i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably unnecessary, but I'd be in favor of setting array[-1] = i
as well to check that we're getting the whole object.
test/stress_tests.py
Outdated
# Define the size of one task's return argument so that the combined sum of | ||
# all objects' sizes is twice the plasma store's allotted memory. | ||
num_objects = 1000 | ||
size = int(self.plasma_store_memory * 2 / (num_objects * 8)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is in stress_tests.py
, can we make it self.plasma_store_memory * 10
instead of * 2
? (same with the other tests)
test/stress_tests.py
Outdated
for i in range(num_iterations): | ||
value = ray.get(args[i]) | ||
self.assertEqual(value[0], i) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we either add another pass through in the reverse direction (in addition to what you have)? E.g.,
for i in range(num_iterations)[::-1]:
value = ray.get(args[i])
self.assertEqual(value[0], i)
or randomly choose 100
values and reconstruction them, e.g.,
for _ in range(100):
i = np.random.randint(num_objects)
value = ray.get(args[i])
self.assertEqual(value[0], i)
Actually, maybe add both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And do something similar in the next test?
test/stress_tests.py
Outdated
def testSingleNodeRecursive(self): | ||
# Define the size of one task's return argument so that the combined sum of | ||
# all objects' sizes is twice the plasma store's allotted memory. | ||
num_iterations = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you call this num_objects
to be consistent with the previous test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with the next test.
test/stress_tests.py
Outdated
arg = no_dependency_task.remote(size) | ||
args.append(arg) | ||
for i in range(num_iterations): | ||
args.append(multiple_dependency.remote(i, *args[i:i+num_args])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i+num_args
-> (i + num_args)
Modify reconstruction stress testing to start Plasma service before rest of Ray cluster TODO about reconstructing ray.puts Fix ray.put error for double creates Distinguish between empty entry and no entry in object table Fix test case Fix Python test Fix tests
reconstruction requests.
test-and-set for the task state
ca1b8b0
to
56dd34e
Compare
src/common/task.c
Outdated
@@ -182,6 +182,9 @@ task_spec *alloc_nil_task_spec(task_id task_id) { | |||
} | |||
|
|||
int64_t task_spec_size(task_spec *spec) { | |||
if (spec == NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be necessary, in which case is it called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks for catching this! I was originally allocating an empty task using this, but turned out to not need it.
src/common/task.c
Outdated
@@ -315,7 +318,10 @@ struct task_impl { | |||
task *alloc_task(task_spec *spec, | |||
scheduling_state state, | |||
db_client_id local_scheduler_id) { | |||
int64_t size = sizeof(task) - sizeof(task_spec) + task_spec_size(spec); | |||
int64_t size = sizeof(task); | |||
if (spec != NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
One bug I encountered while testing this, which is not an issue with this PR and should be fixed separately (but is interesting nonetheless), is the following. This is mostly an issue with the fact that functions are currently identified by their module + name, so functions with the same name override each other. import ray
import numpy as np
ray.init()
@ray.remote
def f(n):
return np.zeros(n)
# Some of these objects are evicted.
l = [f.remote(100000000) for _ in range(1000)]
@ray.remote
def f(x, y):
return x + y
# Do a tree reduction.
while len(l) > 1:
l.append(f.remote(l.pop(0), l.pop(0)))
ray.get(l) When some of the original tasks that created the list
|
test/stress_tests.py
Outdated
plasma_addresses = [] | ||
objstore_memory = (self.plasma_store_memory // self.num_local_schedulers) | ||
for i in range(self.num_local_schedulers): | ||
plasma_addresses.append( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should be indented only 2 spaces (plasma_addresses.append(
that is)
python/ray/worker.py
Outdated
@@ -39,6 +39,10 @@ | |||
DRIVER_ID_LENGTH = 20 | |||
ERROR_ID_LENGTH = 20 | |||
|
|||
# When performing ray.get, wait 1 second before attemping to reconstruct and | |||
# fetch the object again. | |||
GET_TIMEOUT = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be GET_TIMEOUT_MILLISECONDS
"task_id", | ||
*task_args[:3]) | ||
self.assertEqual(response, None) | ||
# Check that the update did not happen. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doesn't actually verify that the update didn't happen because the arguments passed in line 241 and in line 251 are the same. The update happens the first time so whether it happens or not the second time, the result will still be the same.
Maybe you want to do task_args[1] += 1
before line 249?
I may have misunderstood :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, you're right.
@@ -581,6 +586,35 @@ int ResultTableAdd_RedisCommand(RedisModuleCtx *ctx, | |||
return REDISMODULE_OK; | |||
} | |||
|
|||
int ParseTaskState(RedisModuleString *state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is much cleaner (and good catch with closing the keys in this file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :)
@@ -821,6 +845,84 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx, | |||
} | |||
|
|||
/** | |||
* Update an entry in the task table. This does not update the task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation is inaccurate I believe :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, dangers of copy pasta...
src/common/state/redis.c
Outdated
scheduling_state state = task_state(task); | ||
int test_state = update_data->test_state; | ||
|
||
CHECKM(task != NULL, "NULL task passed to redis_task_table_update."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redis_task_table_test_and_update
src/common/state/task_table.c
Outdated
void *user_context) { | ||
task_table_test_and_update_data *update_data = | ||
malloc(sizeof(task_table_test_and_update_data)); | ||
update_data->task = task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit strange that update_data
and update_data->task
will get freed at different times...
Is it possible that if we retry the call to Redis, the callback will get called twice and will try to free update_data->task
twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document here where update_data->task
will be freed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a little unfortunate...I had to do it this way because of the way callback data is cleaned up (the failure callback calls free(callback_data->data)
). Let me see how it would look if we kept the entire task in task_table_test_and_update_data
instead of a pointer.
src/common/state/task_table.h
Outdated
|
||
/* Data that is needed to test and set the task's scheduling state. */ | ||
typedef struct { | ||
task *task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we make task
the last entry of the struct and make it NOT a pointer so that it gets freed along with the rest of the struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could get around these issues in other ways, since we don't actually need the task_spec
inside the task
, we could just include the relevant fields directly in task_table_test_and_update_data
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I think the second is probably the best way.
src/common/state/task_table.c
Outdated
retry_info *retry, | ||
task_table_get_callback done_callback, | ||
void *user_context) { | ||
task_table_test_and_update_data *update_data = | ||
malloc(sizeof(task_table_test_and_update_data)); | ||
update_data->task = task; | ||
memset(update_data, 0, sizeof(task_table_test_and_update_data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memset is unnecessary, right?
No description provided.