-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[xray] Adds a driver table. #2289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Adds a driver table. #2289
Conversation
python/ray/monitor.py
Outdated
@@ -16,6 +16,8 @@ | |||
from ray.core.generated.DriverTableMessage import DriverTableMessage | |||
from ray.core.generated.GcsTableEntry import GcsTableEntry | |||
from ray.core.generated.HeartbeatTableData import HeartbeatTableData | |||
from ray.core.generated.DriverTableData import DriverTableData | |||
from ray.core.generated.ray.protocol.Task import Task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be in ray.gcs_utils
python/ray/monitor.py
Outdated
# common/redis_module/ray_redis_module.cc | ||
OBJECT_INFO_PREFIX = b"OI:" | ||
OBJECT_LOCATION_PREFIX = b"OL:" | ||
TASK_TABLE_PREFIX = b"TT:" | ||
DB_CLIENT_PREFIX = b"CL:" | ||
DB_CLIENT_TABLE_NAME = b"db_clients" | ||
XRAY_TASK_TABLE_PREFIX = b"TASK:" | ||
XRAY_OBJECT_TABLE_PREFIX = b"OBJECT:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these things are already in ray.gcs_utils
python/ray/monitor.py
Outdated
|
||
task_table_infos = {} # task id -> TaskInfo | ||
for key in redis.scan_iter(match=XRAY_TASK_TABLE_PREFIX + b"*"): | ||
entry = redis.get(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@concretevitamin, this will interact with automatic background flushing, right?
We should include a TODO here to address that (e.g., what if key
gets flushed in the background while this is happening).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I think this just needs a simple catch to test whether redis.get(key)
returns None.
python/ray/monitor.py
Outdated
task_table_infos = {} # task id -> TaskInfo | ||
for key in redis.scan_iter(match=XRAY_TASK_TABLE_PREFIX + b"*"): | ||
entry = redis.get(key) | ||
task = Task.GetRootAsTask(entry, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you actually parse it this way? Don't you need to parse it as a GcsTableEntry first?
python/ray/monitor.py
Outdated
task_info.Returns(i) for i in range(task_info.ReturnsLength()) | ||
]) | ||
|
||
# TODO: Remove only objects in GCS that originated from this driver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying we don't have enough information in the GCS to flush objects created by ray.put? From the object ID, we can determine the task ID, and from that we can determine the Driver ID, right? So I think we should be able to flush them already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, let's make sure that we're testing cleanup for ray.put
objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The redis entry for objects do not contain the task ID.
I think testing cleanup for ray.put
is a substantial amount of work and may make sense to do as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is that the object ID itself tells you the task ID, right?
Lines 166 to 174 in aa42331
const TaskID ComputeTaskId(const ObjectID &object_id) { | |
TaskID task_id = object_id; | |
int64_t *first_bytes = reinterpret_cast<int64_t *>(&task_id); | |
// Zero out the lowest kObjectIdIndexSize bits of the first byte of the | |
// object ID. | |
uint64_t bitmask = static_cast<uint64_t>(-1) << kObjectIdIndexSize; | |
*first_bytes = *first_bytes & (bitmask); | |
return task_id; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent point. I was not aware of this method.
driver_id: string; | ||
// Whether it's dead. | ||
is_dead: bool; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline
src/ray/gcs/tables.h
Outdated
}; | ||
virtual ~DriverTable() {} | ||
|
||
Status AppendDriverData(UniqueID driver_id, bool is_dead) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be JobID instead of UniqueID
src/ray/raylet/node_manager.cc
Outdated
HandleDriverTableUpdate(client_id, driver_data); | ||
}; | ||
RAY_RETURN_NOT_OK(gcs_client_->driver_table().Subscribe( | ||
UniqueID::nil(), UniqueID::nil(), driver_table_handler, nullptr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first one should be JobID::nil()
test/monitor_test.py
Outdated
@@ -44,6 +44,8 @@ def Driver(success): | |||
# Two new objects. | |||
ray.get(ray.put(1111)) | |||
ray.get(ray.put(1111)) | |||
# Test is flaky without calls to sleep. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the sleeps needed for? Can we get rid of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test sometimes fails without the wait in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it fail, does this mean the cleanup implementation is buggy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ray.get
completes before information about the object's location is propagated to the GCS, so the count returned by StateSummary()
immediately after a ray.get
is incorrect.
To compare: If the object was remote, then ray.get
would only complete after data propagates to the GCS: The associated pull
for ray.get
depends on information in the GCS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why weren't the sleeps necessary before?
Anyway, if the tests are flaky without the sleeps, then they will probably continue to be flaky even with the sleeps, so we should use a more robust approach (e.g., loop until the information is present (with a timeout that raises an exception)).
test/monitor_test.py
Outdated
def testTaskCleanupOnDriverExitSingleRedisShard(self): | ||
self._testCleanupOnDriverExit(num_redis_shards=1, test_object_cleanup=False) | ||
|
||
@unittest.skipIf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be able to re-enable this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe re-enabling this entails a substantial amount of work. It might make sense to re-enable as a separate PR.
test/monitor_test.py
Outdated
@@ -79,23 +83,43 @@ def f(): | |||
ray.init(redis_address=redis_address) | |||
# The assertion below can fail if the monitor is too slow to clean up | |||
# the global state. | |||
self.assertEqual((0, 1), StateSummary()[:2]) | |||
|
|||
if test_object_cleanup: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this if statement?
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
f0bc4f4
to
406a9ad
Compare
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python/ray/experimental/state.py
Outdated
@@ -168,7 +168,7 @@ def _keys(self, pattern): | |||
""" | |||
result = [] | |||
for client in self.redis_clients: | |||
result.extend(client.keys(pattern)) | |||
result.extend([i for i in client.scan_iter(match=pattern)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the i for i in
is necessary? If so, then maybe just use result.extend(list(client.scan_iter(match=pattern)))
python/ray/monitor.py
Outdated
for task_id_hex in task_table_objects: | ||
if len(task_table_objects[task_id_hex]) == 0: | ||
continue | ||
task_table_object = task_table_objects[task_id_hex][0]['TaskSpec'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but we've been pretty consistent about using "
everywhere instead of '
for strings, so let's stick with that (this applies to a number of places in this PR).
src/ray/gcs/tables.h
Outdated
}; | ||
virtual ~DriverTable() {} | ||
|
||
Status AppendDriverData(JobID driver_id, bool is_dead) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JobID
-> const JobID &
src/ray/gcs/tables.h
Outdated
}; | ||
virtual ~DriverTable() {} | ||
|
||
Status AppendDriverData(JobID driver_id, bool is_dead) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add doxygen documentation
src/ray/gcs/tables.h
Outdated
RAY_LOG(DEBUG) << "Driver entry added callback"; | ||
}); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put the implementation in the .cc file?
python/ray/monitor.py
Outdated
# Get the list of objects returned by driver tasks. | ||
driver_object_ids = set() | ||
for task_info in task_table_infos.values(): | ||
driver_object_ids |= { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is |=
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
is the union operator of set.
python/ray/monitor.py
Outdated
# Also record all the ray.put()'d objects. | ||
all_put_objects = [] | ||
object_table_objects = self.state.object_table() | ||
for object_id in object_table_objects: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop over keys and values to avoid unnecessary indexing
python/ray/monitor.py
Outdated
object_table_objects = self.state.object_table() | ||
for object_id in object_table_objects: | ||
if len(object_table_objects[object_id]) == 0: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can probably assert that len > 0
python/ray/monitor.py
Outdated
continue | ||
object_id_bin = object_id.id() | ||
task_id_bin = ray.local_scheduler.compute_task_id(object_id).id() | ||
all_put_objects.append((object_id_bin, task_id_bin)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be all objects, not just all put objects, right?
python/ray/monitor.py
Outdated
|
||
# Keep objects from relevant tasks. | ||
driver_task_ids_set = set(driver_task_id_bins) | ||
for object_id, task_id in all_put_objects: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this actually covers all objects and not just put objects, then maybe we don't need to loop through the task return values above, is that right?
Test FAILed. |
jenkins, retest this please |
Test FAILed. |
jenkins, retest this please |
python/ray/monitor.py
Outdated
@@ -100,7 +100,7 @@ def __init__(self, redis_address, redis_port, autoscaling_config): | |||
ignore_subscribe_messages=True) | |||
self.shard_subscribe_clients.append(subscribe_client) | |||
else: | |||
# We don't need to subscribe to the shards in legacy Ray. | |||
# We don`t need to subscribe to the shards in legacy Ray. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
` -> '
python/ray/monitor.py
Outdated
assert len(object_table_object) > 0 | ||
object_id_bin = object_id.id() | ||
task_id_bin = ray.local_scheduler.compute_task_id(object_id).id() | ||
all_objects.append((object_id_bin, task_id_bin)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of constructing this all_objects
list, we can probably just do
if task_id_bin in driver_task_id_bins:
driver_object_id_bins.add(object_id)
here, right?
src/ray/gcs/tables.h
Outdated
/// | ||
/// \param driver_id The driver id. | ||
/// \param is_dead Whether the driver is dead. | ||
/// \return The return status of Log.Append. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but I'd change this to just say, The return status
. The interface documentation shouldn't refer to implementation details.
@@ -267,6 +267,10 @@ class ObjectManager : public ObjectManagerInterface { | |||
/// Register object remove with directory. | |||
void NotifyDirectoryObjectDeleted(const ObjectID &object_id); | |||
|
|||
/// Handle any push requests that were made before an object was available. | |||
/// This is invoked when an "object added" notification is received from the store. | |||
void HandleUnfulfilledPushRequests(const ObjectInfoT &object_info); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elibol can you answer the above question?
Test FAILed. |
Looks good to me, I'll merge this once the tests pass. |
Test PASSed. |
Nice work @elibol! |
@elibol This is nice. I'm working on job management on ray, and @robertnishihara pointed that I could leverage this for job cancellation. I saw the code for doing actually cleanup at driver death is currently marked in TODO Wondering if you have plans to finish this part sometime soon? Or if you're busy with other stuff, maybe I could take some time to work on this? |
@zhijunfu that sounds great. Please do have a look into it. |
Thanks @zhijunfu, could you share an outline of what you plan on implementing? |
@elibol Yes will look into it. thanks. |
@robertnishihara Sure. I just raised an issue here with a proposal doc. |
@zhijunfu the driver cleanup should be implemented separately (and before) implementing the job submission, right? |
@robertnishihara yes, the cleanup code is mostly done. Will do some more testing and then submit it to internal review, will send a PR after that:) |
This PR adds a driver table for the new GCS, which enables cleanup functionality associated with monitoring driver death.
Some testing in
monitor_test.py
is restored, but redis sharding for xray is needed to enable remaining tests.