Skip to content

Conversation

stephanie-wang
Copy link
Contributor

What do these changes do?

This minimizes the number of pinned dummy objects per actor by only pinning the ones necessary to execute new actor tasks. This implements the solution described in #3308. Note that this does not solve the issue of garbage-collecting actor handles that are no longer needed, or the issue of general actor garbage collection.

Related issue number

Closes #3308.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10236/
Test FAILed.

@ericl
Copy link
Contributor

ericl commented Dec 21, 2018

How do we test the map size doesn't grow indefinitely? I don't think the existing tests cover this. Run python code and check the C++ stats?

Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a brief look and left a few comments. Will continuing reviewing tmr.

Also, there're a bunch of unrelated changes (converting UniqueID::nil() to UniqueId()), are they unintentional?

// TODO(swang): We use a copy of the task so that for actor tasks, we
// can keep the original execution dependencies in the copy in the
// scheduling queue, but ideally the task in the lineage cache would
// match the queued task exactly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to keep the original execution dependencies in the copy in the scheduling queue now? and why was this not needed before this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a little unfortunate...we need access to the original execution dependency because that is the object that gets released in the actor's frontier once the task completes. I thought about doing the dummy object accounting when the task is assigned, but you don't know yet whether the task succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh hmm actually just realized that we can separate the logic so that we release the previous object when the task is assigned and add the new object when the task finishes. Then I can get rid of this code. :)

// until this first task is submitted.
for (auto &new_handle_id : task.GetTaskSpecification().NewActorHandles()) {
// An actor creation task is the first task, so it cannot have new handles.
RAY_CHECK(task.GetTaskSpecification().IsActorTask());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, this might be cleaner:
RAY_CHECK(!(task.GetTaskSpecification().IsActorCreationTask() && task.GetTaskSpecification().NewActorHandles().size() > 0))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@stephanie-wang
Copy link
Contributor Author

@ericl, yeah we don't have a way to automatically test this right now. Can you try it out on one of the RLlib workloads that would be affected?

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10464/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10465/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10485/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10486/
Test FAILed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stephanie-wang, this solution seems pretty nice.

const ObjectID &execution_dependency) {
if (frontier_.find(handle_id) == frontier_.end()) {
frontier_[handle_id] = FrontierLeaf{
.task_counter = 0, .execution_dependency = execution_dependency,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember the pros/cons, but I don't think we're using this form of initialization anywhere else in the codebase currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I wasn't aware of any pros/cons. Do you have any suggestions in mind?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, when we converted from C to C++, e.g., in #321, we had to make some changes like

https://github.com/ray-project/ray/pull/321/files#diff-450748b4523710897a16506dabc79950L573

and

https://github.com/ray-project/ray/pull/321/files#diff-303e99e77aad9f39624c3769dd615ea0L220

which switched away from using this syntax. I thought it wasn't valid C++, though if it's compiling in your PR maybe I'm remembering it incorrectly. I'm sure @mehrdadn knows the answer.

As for suggestions, I'd defer to other people here. If it's valid C++11 then it's probably fine especially since it's super simple. I guess the alternative would be to manually set the fields after the declaration or to define a simple constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are called designated initializers in C. They were extensions I believe until C99? I had no idea but apparently they've finally made it into C++20. I'd avoid using them given they're not valid C++11.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mehrdadn!

Ok, in that case let's either do

frontier_[handle_id] = FrontierLeaf();
frontier_[handle_id].task_counter = 0;
frontier_[handle_id].execution_dependency = execution_dependency;

or define a constructor.

Copy link
Contributor

@mehrdadn mehrdadn Jan 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to initialize fields externally I'd get a reference and avoid looking up the same object every time. If you make a constructor that should be fine too, though semantically it'd be creating a new object to move onto the old object rather than modifying it or constructing it in place. (Note that using operator[] to index into a map automatically calls the default constructor, so if you don't want to call that, you'd want to use insert.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks!

RAY_CHECK(actor_entry != actor_registry_.end());
// Extend the actor's frontier to include the executed task.
auto dummy_object = task.GetTaskSpecification().ActorDummyObject();
ObjectID object_to_release =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with dummy_object above

ActorHandleID actor_handle_id;
if (task.GetTaskSpecification().IsActorCreationTask()) {
actor_id = task.GetTaskSpecification().ActorCreationId();
actor_handle_id = ActorHandleID();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe clearer to use ActorHandleID::nil() here. Alternatively could remove the line entirely.

# NOTE(swang): If the new actor handle fails to be used (e.g., due
# to a failure to register a named actor), then this may cause a
# memory leak in the backend.
self._ray_new_actor_handles.append(actor_handle_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the pickling case, we set actor_handle_id = self._ray_actor_handle_id. Why not use a nil ID (or some fixed ID like that) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually I guess we could just set a random actor handle ID in Python. We don't want to use nil because that represents the original actor handle.

# not release the cursor for any new handles until the first task for
# each of the new handles is submitted.
# NOTE(swang): If the new actor handle fails to be used (e.g., due
# to a failure to register a named actor), then this may cause a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could happen even without any failure to register a named actor, right? E.g.,

a = Actor.remote()

@ray.remote
def f(a):
    return

f.remote(a)

What precisely gets leaked? The dummy object ID that the forked ID depends on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, I guess the more general problem is GC for actor handles.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10558/
Test FAILed.

@robertnishihara
Copy link
Collaborator

@stephanie-wang this looks good to me, though it looks like Travis is failing.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10578/
Test FAILed.

// If this is an actor task, then this will be populated with all of the new
// actor handles that were forked from this handle since the last task on
// this handle was submitted.
new_actor_handles: [string];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java CI failure is because when we change TaskInfo definition, the corresponding Java file needs to be changed as well.

Unfortunately, for now, generating this file isn't automatic. Because the generated file lacks a certain API and needs some manual patch. I proposed adding the required API to flatbuffers. But they're not willing to do that (google/flatbuffers#5092).

For this PR, you can apply this patch to fix https://gist.github.com/raulchen/1fcecac74ddb809b836ab9bbf9f9e178.

We'll add a script to simplify this process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TLDR explanation of this issue is: we define some ObjectID fields as string in fbs. But these fields are actually byte arrays. For C++ and Python 2, string and byte array are equivalent. But for Java and Python 3, they're different.

Flatbuffers Java API will decode the bytes with UTF8 if a field is defined as string, but the content is actually not UTF8-decodable. Python API doesn't do that for now, but may do in the future. Because Flatbuffers prefers to distinguish strings and byte arrays. If so, our Python code will face the same problem.

An alternative solution is to store hex strings in fbs. But that might require substantial code changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks!

GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.ActorHandleId());
RAY_CHECK(spec.ActorCounter() == expected_task_counter)
<< "Expected actor counter: " << expected_task_counter
<< ", got: " << spec.ActorCounter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also print the task id here? I found it very useful for debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thank you!

/// Once all handles have released a dummy object, it will be removed from
/// this map. This object is safe to evict, since no handle will submit
/// another method dependent on that object.
std::unordered_map<ObjectID, int64_t> dummy_objects_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me quiet a while to understand the above comment.
Base on my understanding, I'm trying to summarize the comment and make it easier to understand for future readers.


This map is used to track all the unreleased dummy objects for this actor. The map key is the dummy object ID, and the map value is the number of actor handles that depends on this dummy object. When the map value decreases to 0, the dummy object is safe to release from the object manager.

An actor handle depends on a dummy object when its next unfinished task depends on the dummy object. For a given dummy object (say D), there could be 2 types of such actor handles:

  1. The actor handle (say H) on which D's creating task (say T) was submitted. If T's next task hasn't finished yet, H still depends on D.
  2. Any handles that were forked from H after T finished, and before T's next task finishes. Such handles depend on H until their first tasks finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I agree that this is clearer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not understand this very well. Because all calls in an Actor form a line in timeline, then should not all the handles for the same Actor depends on the latest same dummy object instead of each handle's last task dummy object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not sure exactly what you're asking, but the dummy objects represent the last task that was known to execute on an actor. So when a task is submitted, it depends on the last task submitted on the same handle. When a task is executed, we might update that dependency to reflect the last task that was actually executed on the actor, which may have come from a different handle.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I mean why should we keep each dependency for all ActorHandles of the same Actor instead of just keeping the latest one? Because we will always update the ActorHandle dependency to the latest one when execute the actor task, then what is the sense to keep a dependency for each ActorHandle when submit the task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand your question now. We keep the dependency for each ActorHandle so that we know which tasks have executed so far and which task from each handle can execute next. The former is mostly important for failure scenarios (e.g., making sure you don't re-execute the same task twice on an actor), and the latter comes up during normal execution when there are multiple tasks from the same handle that could execute.

Both of these things don't strictly need the "dependency"; you could accomplish the same thing with task counters for each handle. However, it's convenient to use dependencies since this is also how we determine whether non-actor tasks can be executed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is to say, when re-execute an Actor Method Sequences after failure, we can only guarantee call order in one ActorHandle, but not in global order(across all handles) with the last Actor method sequences?

@stephanie-wang
Copy link
Contributor Author

Thanks for the comments, @raulchen! I tried to address all of them, so please let me know what you think. I tried for a bit to figure out how to implement the same logic that I did in Python in the Java client, but wasn't quite sure how to proceed. Maybe you or @guoyuhong could try it in a separate PR? I'm also happy to do it, but just need some pointers on where I should be looking.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10604/
Test FAILed.

@raulchen
Copy link
Contributor

raulchen commented Jan 5, 2019

@stephanie-wang Java part looks good. We'll implement the logic in a separate PR later.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10662/
Test PASSed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants