Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Basic actor support #1835

Merged
merged 17 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,8 @@ def remote(cls, *args, **kwargs):
actor_creation_resources,
actor_method_cpus,
ray.worker.global_worker)
# Increment the actor counter to account for the creation task.
actor_counter += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on I think it would make sense to have state like actor_counter in a C++ worker class instead of in Python. Certainly not in this PR, this is just a comment.


# Instantiate the actor handle.
actor_object = cls.__new__(cls)
Expand Down
11 changes: 4 additions & 7 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,11 @@
return RedisModule_ReplyWithError(ctx, (MESSAGE)); \
}

// NOTE(swang): The order of prefixes here must match the TablePrefix enum
// defined in src/ray/gcs/format/gcs.fbs.
static const char *table_prefixes[] = {
NULL,
"TASK:",
"TASK:",
"CLIENT:",
"OBJECT:",
"FUNCTION:",
"TASK_RECONSTRUCTION:",
NULL, "TASK:", "TASK:", "CLIENT:",
"OBJECT:", "ACTOR:", "FUNCTION:", "TASK_RECONSTRUCTION:",
"HEARTBEAT:",
};

Expand Down
6 changes: 6 additions & 0 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ void handle_convert_worker_to_actor(
* filled out, so fill out the correct worker field now. */
algorithm_state->local_actor_infos[actor_id].worker = worker;
}
/* Increment the task counter for the creator's handle to account for the
* actor creation task. */
auto &task_counters =
algorithm_state->local_actor_infos[actor_id].task_counters;
RAY_CHECK(task_counters[ActorHandleID::nil()] == 0);
task_counters[ActorHandleID::nil()]++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this change? I wouldn't have expected any changes to the old code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because I now count the actor creation task as part of the actor counter on the Python side. Without this change, the old codepath will never run any tasks after the actor creation task since the counters don't match.

}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/ray/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ set(RAY_SRCS
raylet/worker.cc
raylet/worker_pool.cc
raylet/scheduling_resources.cc
raylet/actor.cc
raylet/actor_registration.cc
raylet/scheduling_queue.cc
raylet/scheduling_policy.cc
raylet/task_dependency_manager.cc
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id) {
context_.reset(new RedisContext());
client_table_.reset(new ClientTable(context_, this, client_id));
object_table_.reset(new ObjectTable(context_, this));
actor_table_.reset(new ActorTable(context_, this));
task_table_.reset(new TaskTable(context_, this));
raylet_task_table_.reset(new raylet::TaskTable(context_, this));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
Expand Down Expand Up @@ -48,6 +49,8 @@ TaskTable &AsyncGcsClient::task_table() { return *task_table_; }

raylet::TaskTable &AsyncGcsClient::raylet_task_table() { return *raylet_task_table_; }

ActorTable &AsyncGcsClient::actor_table() { return *actor_table_; }

TaskReconstructionLog &AsyncGcsClient::task_reconstruction_log() {
return *task_reconstruction_log_;
}
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ class RAY_EXPORT AsyncGcsClient {
inline FunctionTable &function_table();
// TODO: Some API for getting the error on the driver
inline ClassTable &class_table();
inline ActorTable &actor_table();
inline CustomSerializerTable &custom_serializer_table();
inline ConfigTable &config_table();
ObjectTable &object_table();
TaskTable &task_table();
raylet::TaskTable &raylet_task_table();
ActorTable &actor_table();
TaskReconstructionLog &task_reconstruction_log();
ClientTable &client_table();
HeartbeatTable &heartbeat_table();
Expand All @@ -72,6 +72,7 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<TaskTable> task_table_;
std::unique_ptr<raylet::TaskTable> raylet_task_table_;
std::unique_ptr<ActorTable> actor_table_;
std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_;
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<ClientTable> client_table_;
Expand Down
13 changes: 12 additions & 1 deletion src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ enum TablePrefix:int {
RAYLET_TASK,
CLIENT,
OBJECT,
ACTOR,
FUNCTION,
TASK_RECONSTRUCTION,
HEARTBEAT
HEARTBEAT,
}

// The channel that Add operations to the Table should be published on, if any.
Expand Down Expand Up @@ -89,6 +90,16 @@ table ClassTableData {
}

table ActorTableData {
// The ID of the actor that was created.
actor_id: string;
// The dummy object ID returned by the actor creation task. If the actor
// dies, then this is the object that should be reconstructed for the actor
// to be recreated.
actor_creation_dummy_object_id: string;
// The ID of the driver that created the actor.
driver_id: string;
// The ID of the node manager that created the actor.
node_manager_id: string;
}

table ErrorTableData {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ template class Log<ObjectID, ObjectTableData>;
template class Log<TaskID, ray::protocol::Task>;
template class Table<TaskID, ray::protocol::Task>;
template class Table<TaskID, TaskTableData>;
template class Log<ActorID, ActorTableData>;
template class Log<TaskID, TaskReconstructionData>;
template class Table<ClientID, HeartbeatTableData>;

Expand Down
12 changes: 10 additions & 2 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,22 @@ class FunctionTable : public Table<ObjectID, FunctionTableData> {
using ClassTable = Table<ClassID, ClassTableData>;

// TODO(swang): Set the pubsub channel for the actor table.
using ActorTable = Table<ActorID, ActorTableData>;
class ActorTable : public Log<ActorID, ActorTableData> {
public:
ActorTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
: Log(context, client) {
pubsub_channel_ = TablePubsub_ACTOR;
prefix_ = TablePrefix_TASK_RECONSTRUCTION;
}
};

class TaskReconstructionLog : public Log<TaskID, TaskReconstructionData> {
public:
TaskReconstructionLog(const std::shared_ptr<RedisContext> &context,
AsyncGcsClient *client)
: Log(context, client) {
prefix_ = TablePrefix_TASK_RECONSTRUCTION;
pubsub_channel_ = TablePubsub_ACTOR;
prefix_ = TablePrefix_ACTOR;
}
};

Expand Down
15 changes: 0 additions & 15 deletions src/ray/raylet/actor.cc

This file was deleted.

31 changes: 0 additions & 31 deletions src/ray/raylet/actor.h

This file was deleted.

41 changes: 41 additions & 0 deletions src/ray/raylet/actor_registration.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include "ray/raylet/actor_registration.h"

#include "ray/util/logging.h"

namespace ray {

namespace raylet {

ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data)
: actor_table_data_(actor_table_data),
execution_dependency_(ObjectID::nil()),
frontier_() {}

const ClientID ActorRegistration::GetNodeManagerId() const {
return ClientID::from_binary(actor_table_data_.node_manager_id);
}

const ObjectID ActorRegistration::GetActorCreationDependency() const {
return ObjectID::from_binary(actor_table_data_.actor_creation_dummy_object_id);
}

const ObjectID ActorRegistration::GetExecutionDependency() const {
return execution_dependency_;
}

const std::unordered_map<ActorHandleID, ActorRegistration::FrontierLeaf, UniqueIDHasher>
&ActorRegistration::GetFrontier() const {
return frontier_;
}

void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id,
const ObjectID &execution_dependency) {
auto &frontier_entry = frontier_[handle_id];
frontier_entry.task_counter++;
frontier_entry.execution_dependency = execution_dependency;
execution_dependency_ = execution_dependency;
}

} // namespace raylet

} // namespace ray
96 changes: 96 additions & 0 deletions src/ray/raylet/actor_registration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#ifndef RAY_RAYLET_ACTOR_REGISTRATION_H
#define RAY_RAYLET_ACTOR_REGISTRATION_H

#include <unordered_map>

#include "ray/gcs/format/gcs_generated.h"
#include "ray/id.h"

namespace ray {

namespace raylet {

/// \class ActorRegistration
///
/// Information about an actor registered in the system. This includes the
/// actor's current node manager location, and if local, information about its
/// current execution state, used for reconstruction purposes.
class ActorRegistration {
public:
/// Create an actor registration.
///
/// \param actor_table_data Information from the global actor table about
/// this actor. This includes the actor's node manager location.
ActorRegistration(const ActorTableDataT &actor_table_data);

/// Each actor may have multiple callers, or "handles". A frontier leaf
/// represents the execution state of the actor with respect to a single
/// handle.
struct FrontierLeaf {
/// The number of tasks submitted by this handle that have executed on the
/// actor so far.
int64_t task_counter;
/// The execution dependency returned by the task submitted by this handle
/// that most recently executed on the actor.
ObjectID execution_dependency;
};

/// Get the actor's node manager location.
///
/// \return The actor's node manager location. All tasks for the actor should
/// be forwarded to this node.
const ClientID GetNodeManagerId() const;

/// Get the object that represents the actor's initial state. This is the
/// execution dependency returned by this actor's creation task. If
/// reconstructed, this will recreate the actor.
///
/// \return The execution dependency returned by the actor's creation task.
const ObjectID GetActorCreationDependency() const;

/// Get the object that represents the actor's current state. This is the
/// execution dependency returned by the task most recently executed on the
/// actor. The next task to execute on the actor should be marked as
/// execution-dependent on this object.
///
/// \return The execution dependency returned by the most recently executed
/// task.
const ObjectID GetExecutionDependency() const;

/// Get the execution frontier of the actor, indexed by handle. This captures
/// the execution state of the actor, a summary of which tasks have executed
/// so far.
///
/// \return The actor frontier, a map from handle ID to execution state for
/// that handle.
const std::unordered_map<ActorHandleID, FrontierLeaf, UniqueIDHasher> &GetFrontier()
const;

/// Extend the frontier of the actor by a single task. This should be called
/// whenever the actor executes a task.
///
/// \param handle_id The ID of the handle that submitted the task.
/// \param execution_dependency The object representing the actor's new
/// state. This is the execution dependency returned by the task.
void ExtendFrontier(const ActorHandleID &handle_id,
const ObjectID &execution_dependency);

private:
/// Information from the global actor table about this actor, including the
/// node manager location.
ActorTableDataT actor_table_data_;
/// The object representing the state following the actor's most recently
/// executed task. The next task to execute on the actor should be marked as
/// execution-dependent on this object.
ObjectID execution_dependency_;
/// The execution frontier of the actor, which represents which tasks have
/// executed so far and which tasks may execute next, based on execution
/// dependencies. This is indexed by handle.
std::unordered_map<ActorHandleID, FrontierLeaf, UniqueIDHasher> frontier_;
};

} // namespace raylet

} // namespace ray

#endif // RAY_RAYLET_ACTOR_REGISTRATION_H
Loading