Skip to content

[xray] Adds a driver table. #2289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Aug 9, 2018

Conversation

elibol
Copy link
Contributor

@elibol elibol commented Jun 21, 2018

This PR adds a driver table for the new GCS, which enables cleanup functionality associated with monitoring driver death.

Some testing in monitor_test.py is restored, but redis sharding for xray is needed to enable remaining tests.

@@ -16,6 +16,8 @@
from ray.core.generated.DriverTableMessage import DriverTableMessage
from ray.core.generated.GcsTableEntry import GcsTableEntry
from ray.core.generated.HeartbeatTableData import HeartbeatTableData
from ray.core.generated.DriverTableData import DriverTableData
from ray.core.generated.ray.protocol.Task import Task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in ray.gcs_utils

# common/redis_module/ray_redis_module.cc
OBJECT_INFO_PREFIX = b"OI:"
OBJECT_LOCATION_PREFIX = b"OL:"
TASK_TABLE_PREFIX = b"TT:"
DB_CLIENT_PREFIX = b"CL:"
DB_CLIENT_TABLE_NAME = b"db_clients"
XRAY_TASK_TABLE_PREFIX = b"TASK:"
XRAY_OBJECT_TABLE_PREFIX = b"OBJECT:"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these things are already in ray.gcs_utils


task_table_infos = {} # task id -> TaskInfo
for key in redis.scan_iter(match=XRAY_TASK_TABLE_PREFIX + b"*"):
entry = redis.get(key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@concretevitamin, this will interact with automatic background flushing, right?

We should include a TODO here to address that (e.g., what if key gets flushed in the background while this is happening).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I think this just needs a simple catch to test whether redis.get(key) returns None.

task_table_infos = {} # task id -> TaskInfo
for key in redis.scan_iter(match=XRAY_TASK_TABLE_PREFIX + b"*"):
entry = redis.get(key)
task = Task.GetRootAsTask(entry, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you actually parse it this way? Don't you need to parse it as a GcsTableEntry first?

task_info.Returns(i) for i in range(task_info.ReturnsLength())
])

# TODO: Remove only objects in GCS that originated from this driver.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we don't have enough information in the GCS to flush objects created by ray.put? From the object ID, we can determine the task ID, and from that we can determine the Driver ID, right? So I think we should be able to flush them already.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's make sure that we're testing cleanup for ray.put objects.

Copy link
Contributor Author

@elibol elibol Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redis entry for objects do not contain the task ID.

I think testing cleanup for ray.put is a substantial amount of work and may make sense to do as a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that the object ID itself tells you the task ID, right?

ray/src/ray/id.cc

Lines 166 to 174 in aa42331

const TaskID ComputeTaskId(const ObjectID &object_id) {
TaskID task_id = object_id;
int64_t *first_bytes = reinterpret_cast<int64_t *>(&task_id);
// Zero out the lowest kObjectIdIndexSize bits of the first byte of the
// object ID.
uint64_t bitmask = static_cast<uint64_t>(-1) << kObjectIdIndexSize;
*first_bytes = *first_bytes & (bitmask);
return task_id;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point. I was not aware of this method.

driver_id: string;
// Whether it's dead.
is_dead: bool;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline

};
virtual ~DriverTable() {}

Status AppendDriverData(UniqueID driver_id, bool is_dead) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be JobID instead of UniqueID

HandleDriverTableUpdate(client_id, driver_data);
};
RAY_RETURN_NOT_OK(gcs_client_->driver_table().Subscribe(
UniqueID::nil(), UniqueID::nil(), driver_table_handler, nullptr));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first one should be JobID::nil()

@@ -44,6 +44,8 @@ def Driver(success):
# Two new objects.
ray.get(ray.put(1111))
ray.get(ray.put(1111))
# Test is flaky without calls to sleep.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the sleeps needed for? Can we get rid of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test sometimes fails without the wait in place.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it fail, does this mean the cleanup implementation is buggy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray.get completes before information about the object's location is propagated to the GCS, so the count returned by StateSummary() immediately after a ray.get is incorrect.

To compare: If the object was remote, then ray.get would only complete after data propagates to the GCS: The associated pull for ray.get depends on information in the GCS.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why weren't the sleeps necessary before?

Anyway, if the tests are flaky without the sleeps, then they will probably continue to be flaky even with the sleeps, so we should use a more robust approach (e.g., loop until the information is present (with a timeout that raises an exception)).

def testTaskCleanupOnDriverExitSingleRedisShard(self):
self._testCleanupOnDriverExit(num_redis_shards=1, test_object_cleanup=False)

@unittest.skipIf(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to re-enable this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe re-enabling this entails a substantial amount of work. It might make sense to re-enable as a separate PR.

@@ -79,23 +83,43 @@ def f():
ray.init(redis_address=redis_address)
# The assertion below can fail if the monitor is too slow to clean up
# the global state.
self.assertEqual((0, 1), StateSummary()[:2])

if test_object_cleanup:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this if statement?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6176/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6177/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6178/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6179/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6183/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6180/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6182/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6186/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6187/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6263/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6265/
Test FAILed.

@elibol elibol force-pushed the add_xray_driver_table branch from f0bc4f4 to 406a9ad Compare July 2, 2018 22:07
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6428/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6879/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6882/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6881/
Test PASSed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @elibol, this looks pretty good. I left some questions/comments. @eric-jj want to take a look also?

@@ -168,7 +168,7 @@ def _keys(self, pattern):
"""
result = []
for client in self.redis_clients:
result.extend(client.keys(pattern))
result.extend([i for i in client.scan_iter(match=pattern)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the i for i in is necessary? If so, then maybe just use result.extend(list(client.scan_iter(match=pattern)))

for task_id_hex in task_table_objects:
if len(task_table_objects[task_id_hex]) == 0:
continue
task_table_object = task_table_objects[task_id_hex][0]['TaskSpec']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but we've been pretty consistent about using " everywhere instead of ' for strings, so let's stick with that (this applies to a number of places in this PR).

};
virtual ~DriverTable() {}

Status AppendDriverData(JobID driver_id, bool is_dead) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobID -> const JobID &

};
virtual ~DriverTable() {}

Status AppendDriverData(JobID driver_id, bool is_dead) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doxygen documentation

RAY_LOG(DEBUG) << "Driver entry added callback";
});
}
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put the implementation in the .cc file?

# Get the list of objects returned by driver tasks.
driver_object_ids = set()
for task_info in task_table_infos.values():
driver_object_ids |= {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is |=?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| is the union operator of set.

# Also record all the ray.put()'d objects.
all_put_objects = []
object_table_objects = self.state.object_table()
for object_id in object_table_objects:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loop over keys and values to avoid unnecessary indexing

object_table_objects = self.state.object_table()
for object_id in object_table_objects:
if len(object_table_objects[object_id]) == 0:
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can probably assert that len > 0

continue
object_id_bin = object_id.id()
task_id_bin = ray.local_scheduler.compute_task_id(object_id).id()
all_put_objects.append((object_id_bin, task_id_bin))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be all objects, not just all put objects, right?


# Keep objects from relevant tasks.
driver_task_ids_set = set(driver_task_id_bins)
for object_id, task_id in all_put_objects:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this actually covers all objects and not just put objects, then maybe we don't need to loop through the task return values above, is that right?

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7107/
Test FAILed.

@elibol
Copy link
Contributor Author

elibol commented Aug 8, 2018

jenkins, retest this please

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7339/
Test FAILed.

@elibol
Copy link
Contributor Author

elibol commented Aug 8, 2018

jenkins, retest this please

@@ -100,7 +100,7 @@ def __init__(self, redis_address, redis_port, autoscaling_config):
ignore_subscribe_messages=True)
self.shard_subscribe_clients.append(subscribe_client)
else:
# We don't need to subscribe to the shards in legacy Ray.
# We don`t need to subscribe to the shards in legacy Ray.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

` -> '

assert len(object_table_object) > 0
object_id_bin = object_id.id()
task_id_bin = ray.local_scheduler.compute_task_id(object_id).id()
all_objects.append((object_id_bin, task_id_bin))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of constructing this all_objects list, we can probably just do

if task_id_bin in driver_task_id_bins:
    driver_object_id_bins.add(object_id)

here, right?

///
/// \param driver_id The driver id.
/// \param is_dead Whether the driver is dead.
/// \return The return status of Log.Append.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I'd change this to just say, The return status. The interface documentation shouldn't refer to implementation details.

@@ -267,6 +267,10 @@ class ObjectManager : public ObjectManagerInterface {
/// Register object remove with directory.
void NotifyDirectoryObjectDeleted(const ObjectID &object_id);

/// Handle any push requests that were made before an object was available.
/// This is invoked when an "object added" notification is received from the store.
void HandleUnfulfilledPushRequests(const ObjectInfoT &object_info);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elibol can you answer the above question?

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7340/
Test FAILed.

@robertnishihara
Copy link
Collaborator

Looks good to me, I'll merge this once the tests pass.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7348/
Test PASSed.

@robertnishihara robertnishihara merged commit 8ae8218 into ray-project:master Aug 9, 2018
@robertnishihara
Copy link
Collaborator

Nice work @elibol!

@zhijunfu
Copy link
Contributor

zhijunfu commented Aug 30, 2018

@elibol This is nice. I'm working on job management on ray, and @robertnishihara pointed that I could leverage this for job cancellation.

I saw the code for doing actually cleanup at driver death is currently marked in TODO

Wondering if you have plans to finish this part sometime soon? Or if you're busy with other stuff, maybe I could take some time to work on this?

@elibol
Copy link
Contributor Author

elibol commented Aug 30, 2018

@zhijunfu that sounds great. Please do have a look into it.

@robertnishihara
Copy link
Collaborator

Thanks @zhijunfu, could you share an outline of what you plan on implementing?

@zhijunfu
Copy link
Contributor

@elibol Yes will look into it. thanks.

@zhijunfu
Copy link
Contributor

@robertnishihara Sure. I just raised an issue here with a proposal doc.

@robertnishihara
Copy link
Collaborator

@zhijunfu the driver cleanup should be implemented separately (and before) implementing the job submission, right?

@zhijunfu
Copy link
Contributor

zhijunfu commented Sep 3, 2018

@robertnishihara yes, the cleanup code is mostly done. Will do some more testing and then submit it to internal review, will send a PR after that:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants