-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Basic actor support #1835
Changes from all commits
4fb8fd5
bb6d91b
16a4afd
4865eab
ecb9734
68f32f5
4b97e45
a126ddc
c4d3f5c
4bfaf29
e535903
0b22779
c3c6aac
8a0ac94
e9d9cdd
9a6ae1e
8d115be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -358,6 +358,12 @@ void handle_convert_worker_to_actor( | |
* filled out, so fill out the correct worker field now. */ | ||
algorithm_state->local_actor_infos[actor_id].worker = worker; | ||
} | ||
/* Increment the task counter for the creator's handle to account for the | ||
* actor creation task. */ | ||
auto &task_counters = | ||
algorithm_state->local_actor_infos[actor_id].task_counters; | ||
RAY_CHECK(task_counters[ActorHandleID::nil()] == 0); | ||
task_counters[ActorHandleID::nil()]++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain this change? I wouldn't have expected any changes to the old code path. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's because I now count the actor creation task as part of the actor counter on the Python side. Without this change, the old codepath will never run any tasks after the actor creation task since the counters don't match. |
||
} | ||
|
||
/** | ||
|
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
#include "ray/raylet/actor_registration.h" | ||
|
||
#include "ray/util/logging.h" | ||
|
||
namespace ray { | ||
|
||
namespace raylet { | ||
|
||
ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data) | ||
: actor_table_data_(actor_table_data), | ||
execution_dependency_(ObjectID::nil()), | ||
frontier_() {} | ||
|
||
const ClientID ActorRegistration::GetNodeManagerId() const { | ||
return ClientID::from_binary(actor_table_data_.node_manager_id); | ||
} | ||
|
||
const ObjectID ActorRegistration::GetActorCreationDependency() const { | ||
return ObjectID::from_binary(actor_table_data_.actor_creation_dummy_object_id); | ||
} | ||
|
||
const ObjectID ActorRegistration::GetExecutionDependency() const { | ||
return execution_dependency_; | ||
} | ||
|
||
const std::unordered_map<ActorHandleID, ActorRegistration::FrontierLeaf, UniqueIDHasher> | ||
&ActorRegistration::GetFrontier() const { | ||
return frontier_; | ||
} | ||
|
||
void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, | ||
const ObjectID &execution_dependency) { | ||
auto &frontier_entry = frontier_[handle_id]; | ||
frontier_entry.task_counter++; | ||
frontier_entry.execution_dependency = execution_dependency; | ||
execution_dependency_ = execution_dependency; | ||
} | ||
|
||
} // namespace raylet | ||
|
||
} // namespace ray |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
#ifndef RAY_RAYLET_ACTOR_REGISTRATION_H | ||
#define RAY_RAYLET_ACTOR_REGISTRATION_H | ||
|
||
#include <unordered_map> | ||
|
||
#include "ray/gcs/format/gcs_generated.h" | ||
#include "ray/id.h" | ||
|
||
namespace ray { | ||
|
||
namespace raylet { | ||
|
||
/// \class ActorRegistration | ||
/// | ||
/// Information about an actor registered in the system. This includes the | ||
/// actor's current node manager location, and if local, information about its | ||
/// current execution state, used for reconstruction purposes. | ||
class ActorRegistration { | ||
public: | ||
/// Create an actor registration. | ||
/// | ||
/// \param actor_table_data Information from the global actor table about | ||
/// this actor. This includes the actor's node manager location. | ||
ActorRegistration(const ActorTableDataT &actor_table_data); | ||
|
||
/// Each actor may have multiple callers, or "handles". A frontier leaf | ||
/// represents the execution state of the actor with respect to a single | ||
/// handle. | ||
struct FrontierLeaf { | ||
/// The number of tasks submitted by this handle that have executed on the | ||
/// actor so far. | ||
int64_t task_counter; | ||
/// The execution dependency returned by the task submitted by this handle | ||
/// that most recently executed on the actor. | ||
ObjectID execution_dependency; | ||
}; | ||
|
||
/// Get the actor's node manager location. | ||
/// | ||
/// \return The actor's node manager location. All tasks for the actor should | ||
/// be forwarded to this node. | ||
const ClientID GetNodeManagerId() const; | ||
|
||
/// Get the object that represents the actor's initial state. This is the | ||
/// execution dependency returned by this actor's creation task. If | ||
/// reconstructed, this will recreate the actor. | ||
/// | ||
/// \return The execution dependency returned by the actor's creation task. | ||
const ObjectID GetActorCreationDependency() const; | ||
|
||
/// Get the object that represents the actor's current state. This is the | ||
/// execution dependency returned by the task most recently executed on the | ||
/// actor. The next task to execute on the actor should be marked as | ||
/// execution-dependent on this object. | ||
/// | ||
/// \return The execution dependency returned by the most recently executed | ||
/// task. | ||
const ObjectID GetExecutionDependency() const; | ||
|
||
/// Get the execution frontier of the actor, indexed by handle. This captures | ||
/// the execution state of the actor, a summary of which tasks have executed | ||
/// so far. | ||
/// | ||
/// \return The actor frontier, a map from handle ID to execution state for | ||
/// that handle. | ||
const std::unordered_map<ActorHandleID, FrontierLeaf, UniqueIDHasher> &GetFrontier() | ||
const; | ||
|
||
/// Extend the frontier of the actor by a single task. This should be called | ||
/// whenever the actor executes a task. | ||
/// | ||
/// \param handle_id The ID of the handle that submitted the task. | ||
/// \param execution_dependency The object representing the actor's new | ||
/// state. This is the execution dependency returned by the task. | ||
void ExtendFrontier(const ActorHandleID &handle_id, | ||
const ObjectID &execution_dependency); | ||
|
||
private: | ||
/// Information from the global actor table about this actor, including the | ||
/// node manager location. | ||
ActorTableDataT actor_table_data_; | ||
/// The object representing the state following the actor's most recently | ||
/// executed task. The next task to execute on the actor should be marked as | ||
/// execution-dependent on this object. | ||
ObjectID execution_dependency_; | ||
/// The execution frontier of the actor, which represents which tasks have | ||
/// executed so far and which tasks may execute next, based on execution | ||
/// dependencies. This is indexed by handle. | ||
std::unordered_map<ActorHandleID, FrontierLeaf, UniqueIDHasher> frontier_; | ||
}; | ||
|
||
} // namespace raylet | ||
|
||
} // namespace ray | ||
|
||
#endif // RAY_RAYLET_ACTOR_REGISTRATION_H |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later on I think it would make sense to have state like
actor_counter
in a C++ worker class instead of in Python. Certainly not in this PR, this is just a comment.