Skip to content

Commit

Permalink
Reconstruction for evicted objects (ray-project#181)
Browse files Browse the repository at this point in the history
* First pass at reconstruction in the worker

Modify reconstruction stress testing to start Plasma service before rest of Ray cluster

TODO about reconstructing ray.puts

Fix ray.put error for double creates

Distinguish between empty entry and no entry in object table

Fix test case

Fix Python test

Fix tests

* Only call reconstruct on objects we have not yet received

* Address review comments

* Fix reconstruction for Python3

* remove unused code

* Address Robert's comments, stress tests are crashing

* Test and update the task's scheduling state to suppress duplicate
reconstruction requests.

* Split result table into two lookups, one for task ID and the other as a
test-and-set for the task state

* Fix object table tests

* Fix redis module result_table_lookup test case

* Multinode reconstruction tests

* Fix python3 test case

* rename

* Use new start_redis

* Remove unused code

* lint

* indent

* Address Robert's comments

* Use start_redis from ray.services in state table tests

* Remove unnecessary memset
  • Loading branch information
stephanie-wang authored and robertnishihara committed Feb 2, 2017
1 parent f69d4aa commit 241b539
Show file tree
Hide file tree
Showing 26 changed files with 669 additions and 183 deletions.
72 changes: 48 additions & 24 deletions python/common/redis_module/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import unittest
import redis
import ray.services

# Check if the redis-server binary is present.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server")
Expand Down Expand Up @@ -42,17 +43,11 @@ def integerToAsciiHex(num, numbytes):
class TestGlobalStateStore(unittest.TestCase):

def setUp(self):
redis_port = random.randint(2000, 50000)
self.redis_process = subprocess.Popen([redis_path,
"--port", str(redis_port),
"--loglevel", "warning",
"--loadmodule", module_path])
time.sleep(1.5)
redis_port = ray.services.start_redis()
self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0)

def tearDown(self):
self.redis_process.kill()

ray.services.cleanup()

def testInvalidObjectTableAdd(self):
# Check that Redis returns an error when RAY.OBJECT_TABLE_ADD is called with
Expand Down Expand Up @@ -81,7 +76,7 @@ def testObjectTableAddAndLookup(self):
# Try calling RAY.OBJECT_TABLE_LOOKUP with an object ID that has not been
# added yet.
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set([]))
self.assertEqual(response, None)
# Add some managers and try again.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
Expand Down Expand Up @@ -109,7 +104,7 @@ def testObjectTableAddAndRemove(self):
# Try calling RAY.OBJECT_TABLE_LOOKUP with an object ID that has not been
# added yet.
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set([]))
self.assertEqual(response, None)
# Add some managers and try again.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
Expand All @@ -131,7 +126,7 @@ def testObjectTableAddAndRemove(self):
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id2")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set())
# Remove a manager from an empty set, and make sure we still have an empty set.
# Remove a manager from an empty set, and make sure we now have an empty set.
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set())
Expand Down Expand Up @@ -173,24 +168,19 @@ def testResultTableAddAndLookup(self):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertIsNone(response)
# Add the result to the result table. This is necessary, but not sufficient
# because the task is still not in the task table.
self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id1", "task_id1")
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertIsNone(response)
# Add the task to the task table so that the result table lookup can
# succeed.
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id1", 1, "local_scheduler_id1", "task_spec1")
# Add the result to the result table. The lookup now returns the task ID.
task_id = b"task_id1"
self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id1", task_id)
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertEqual(response, [1, b"local_scheduler_id1", b"task_spec1"])
self.assertEqual(response, task_id)
# Doing it again should still work.
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertEqual(response, [1, b"local_scheduler_id1", b"task_spec1"])
self.assertEqual(response, task_id)
# Try another result table lookup. This should succeed.
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id2", 2, "local_scheduler_id2", "task_spec2")
self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id2", "task_id2")
task_id = b"task_id2"
self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id2", task_id)
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id2")
self.assertEqual(response, [2, b"local_scheduler_id2", b"task_spec2"])
self.assertEqual(response, task_id)

def testInvalidTaskTableAdd(self):
# Check that Redis returns an error when RAY.TASK_TABLE_ADD is called with
Expand Down Expand Up @@ -227,6 +217,40 @@ def testTaskTableAddAndLookup(self):
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(response, task_args)

# If the current value, test value, and set value are all the same, the
# update happens, and the response is still the same task.
task_args = [task_args[0]] + task_args
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
# Check that the task entry is still the same.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, task_args[1:])

# If the current value is the same as the test value, and the set value is
# different, the update happens, and the response is the entire task.
task_args[1] += 1
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
# Check that the update happened.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, task_args[1:])

# If the current value is no longer the same as the test value, the
# response is nil.
task_args[1] += 1
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, None)
# Check that the update did not happen.
get_response2 = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response2, get_response)
self.assertNotEqual(get_response2, task_args[1:])

def testTaskTableSubscribe(self):
scheduling_state = 1
node_id = "node_id"
Expand Down
6 changes: 4 additions & 2 deletions python/plasma/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ def get(self, object_ids, timeout_ms=-1):
Args:
object_ids (List[str]): A list of strings used to identify some objects.
timeout_ms (int): The number of milliseconds that the get call should
block before timing out and returning.
block before timing out and returning. Pass -1 if the call should block
and 0 if the call should return immediately.
"""
results = libplasma.get(self.conn, object_ids, timeout_ms)
assert len(object_ids) == len(results)
Expand All @@ -172,7 +173,8 @@ def get_metadata(self, object_ids, timeout_ms=-1):
Args:
object_ids (List[str]): A list of strings used to identify some objects.
timeout_ms (int): The number of milliseconds that the get call should
block before timing out and returning.
block before timing out and returning. Pass -1 if the call should block
and 0 if the call should return immediately.
"""
results = libplasma.get(self.conn, object_ids, timeout_ms)
assert len(object_ids) == len(results)
Expand Down
10 changes: 9 additions & 1 deletion python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
def address(ip_address, port):
return ip_address + ":" + str(port)

def get_ip_address(address):
try:
ip_address = address.split(":")[0]
except:
raise Exception("Unable to parse IP address from address {}".format(address))
return ip_address

def get_port(address):
try:
port = int(address.split(":")[1])
Expand Down Expand Up @@ -430,7 +437,8 @@ def start_ray_processes(address_info=None,
# A Redis address was provided, so start a Redis server with the given
# port. TODO(rkn): We should check that the IP address corresponds to the
# machine that this method is running on.
redis_ip_address, redis_port = redis_address.split(":")
redis_ip_address = get_ip_address(redis_address)
redis_port = get_port(redis_address)
new_redis_port = start_redis(port=int(redis_port),
num_retries=1,
cleanup=cleanup,
Expand Down
47 changes: 35 additions & 12 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
DRIVER_ID_LENGTH = 20
ERROR_ID_LENGTH = 20

# When performing ray.get, wait 1 second before attemping to reconstruct and
# fetch the object again.
GET_TIMEOUT_MILLISECONDS = 1000

def random_string():
return np.random.bytes(20)

Expand Down Expand Up @@ -421,13 +425,13 @@ def put_object(self, objectid, value):
# Serialize and put the object in the object store.
try:
numbuf.store_list(objectid.id(), self.plasma_client.conn, [value])
except plasma.plasma_object_exists_error as e:
except numbuf.numbuf_plasma_object_exists_error as e:
# The object already exists in the object store, so there is no need to
# add it again. TODO(rkn): We need to compare the hashes and make sure
# that the objects are in fact the same. We also should return an error
# code to the caller instead of printing a message.
print("This object already exists in the object store.")
return

global contained_objectids
# Optionally do something with the contained_objectids here.
contained_objectids = []
Expand All @@ -443,18 +447,37 @@ def get_object(self, object_ids):
values should be retrieved.
"""
self.plasma_client.fetch([object_id.id() for object_id in object_ids])
# We currently pass in a timeout of one second.
unready_ids = object_ids

# Get the objects. We initially try to get the objects immediately.
final_results = numbuf.retrieve_list(
[object_id.id() for object_id in object_ids],
self.plasma_client.conn,
0)
# Construct a dictionary mapping object IDs that we haven't gotten yet to
# their original index in the object_ids argument.
unready_ids = dict((object_id, i) for (i, (object_id, val)) in
enumerate(final_results) if val is None)
# Try reconstructing any objects we haven't gotten yet. Try to get them
# until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat.
while len(unready_ids) > 0:
results = numbuf.retrieve_list([object_id.id() for object_id in object_ids], self.plasma_client.conn, 1000)
unready_ids = [object_id for (object_id, val) in results if val is None]
# This would be a natural place to issue a command to reconstruct some of
# the objects.
for unready_id in unready_ids:
self.photon_client.reconstruct_object(unready_id)
results = numbuf.retrieve_list(list(unready_ids.keys()),
self.plasma_client.conn,
GET_TIMEOUT_MILLISECONDS)
# Remove any entries for objects we received during this iteration so we
# don't retrieve the same object twice.
for object_id, val in results:
if val is not None:
index = unready_ids[object_id]
final_results[index] = (object_id, val)
unready_ids.pop(object_id)

# Unwrap the object from the list (it was wrapped put_object).
assert len(results) == len(object_ids)
for i in range(len(results)):
assert results[i][0] == object_ids[i].id()
return [result[1][0] for result in results]
assert len(final_results) == len(object_ids)
for i in range(len(final_results)):
assert final_results[i][0] == object_ids[i].id()
return [result[1][0] for result in final_results]

def submit_task(self, function_id, func_name, args):
"""Submit a remote task to the scheduler.
Expand Down
10 changes: 10 additions & 0 deletions src/common/lib/python/common_extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ void init_pickle_module(void) {

/* Define the PyObjectID class. */

int PyStringToUniqueID(PyObject *object, object_id *object_id) {
if (PyBytes_Check(object)) {
memcpy(&object_id->id[0], PyBytes_AsString(object), UNIQUE_ID_SIZE);
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
return 0;
}
}

int PyObjectToUniqueID(PyObject *object, object_id *objectid) {
if (PyObject_IsInstance(object, (PyObject *) &PyObjectIDType)) {
*objectid = ((PyObjectID *) object)->object_id;
Expand Down
2 changes: 2 additions & 0 deletions src/common/lib/python/common_extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ extern PyObject *pickle_loads;

void init_pickle_module(void);

int PyStringToUniqueID(PyObject *object, object_id *object_id);

int PyObjectToUniqueID(PyObject *object, object_id *objectid);

PyObject *PyObjectID_make(object_id object_id);
Expand Down
Loading

0 comments on commit 241b539

Please sign in to comment.