Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add common preprocessing for each request in node manager. #5296

Merged
merged 23 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions src/ray/protobuf/raylet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ package ray.rpc;
import "src/ray/protobuf/common.proto";
import "src/ray/protobuf/gcs.proto";

/// NOTE(Joey Jiang) Every request defined in this file should have a `worker_id` field, which
/// will be used in `NodeManager::PreprocessRequest`.

/// Service request and reply messages.
message RegisterClientRequest {
// Indicates the client is a worker or a driver.
bool is_worker = 1;
// The worker id.
bytes worker_id = 2;
bytes worker_id = 1;
// Indicates the client is a worker or a driver.
bool is_worker = 2;
// The process ID of this worker.
uint32 worker_pid = 3;
// The job ID.
Expand All @@ -27,7 +30,8 @@ message RegisterClientReply {
}

message SubmitTaskRequest {
TaskSpec task_spec = 1;
bytes worker_id = 1;
jiangzihao2009 marked this conversation as resolved.
Show resolved Hide resolved
TaskSpec task_spec = 2;
}
message SubmitTaskReply {
}
Expand Down Expand Up @@ -55,14 +59,14 @@ message TaskDoneReply {
}

message FetchOrReconstructRequest {
// The worker ID.
bytes worker_id = 1;
// List of object IDs of the objects that we want to reconstruct or fetch.
repeated bytes object_ids = 1;
repeated bytes object_ids = 2;
// Indicates that we only want to fetch objects, not reconstruct them.
bool fetch_only = 2;
bool fetch_only = 3;
// The current task ID. If fetch_only is false, then this task is blocked.
bytes task_id = 3;
// The worker ID.
bytes worker_id = 4;
bytes task_id = 4;
}
message FetchOrReconstructReply {
}
Expand All @@ -76,19 +80,19 @@ message NotifyUnblockedReply {
}

message WaitRequest {
// The worker ID.
bytes worker_id = 1;
// List of object ids we'll be waiting on.
repeated bytes object_ids = 1;
repeated bytes object_ids = 2;
// Number of objects expected to be returned, if available.
uint64 num_ready_objects = 2;
uint64 num_ready_objects = 3;
// Timeout in milliseconds.
int64 timeout = 3;
int64 timeout = 4;
// Whether to wait until objects appear locally.
bool wait_local = 4;
bool wait_local = 5;
// The current task ID. If there are less than num_ready_objects local, then
// this task is blocked.
bytes task_id = 5;
// The worker ID.
bytes worker_id = 6;
bytes task_id = 6;
}
message WaitReply {
// List of object ids found.
Expand All @@ -98,60 +102,68 @@ message WaitReply {
}

message PushErrorRequest {
// The worker ID.
bytes worker_id = 1;
// The job id that the error is for.
bytes job_id = 1;
bytes job_id = 2;
// The type of the error.
bytes type = 2;
bytes type = 3;
// The error message.
bytes error_message = 3;
bytes error_message = 4;
// The timestamp of the error message.
double timestamp = 4;
double timestamp = 5;
}
message PushErrorReply {
}

message PushProfileEventsRequest {
ProfileTableData profile_table_data = 1;
bytes worker_id = 1;
ProfileTableData profile_table_data = 2;
}
message PushProfileEventsReply {
}

message FreeObjectsInStoreRequest {
// The worker ID.
bytes worker_id = 1;
// Whether keep this request within the local object store
// or send it to all of the object stores.
bool local_only = 1;
bool local_only = 2;
// Whether also delete objects' creating tasks from GCS.
bool delete_creating_tasks = 2;
bool delete_creating_tasks = 3;
// List of object ids to delete from the object store.
repeated bytes object_ids = 3;
repeated bytes object_ids = 4;
}
message FreeObjectsInStoreReply {
}

message PrepareActorCheckpointRequest {
bytes actor_id = 1;
bytes worker_id = 2;
bytes worker_id = 1;
bytes actor_id = 2;
}
message PrepareActorCheckpointReply {
bytes checkpoint_id = 1;
bytes worker_id = 1;
bytes checkpoint_id = 2;
}

message NotifyActorResumedFromCheckpointRequest {
bytes worker_id = 1;
// ID of the actor that resumed.
bytes actor_id = 1;
bytes actor_id = 2;
// ID of the checkpoint from which the actor was resumed.
bytes checkpoint_id = 2;
bytes checkpoint_id = 3;
}
message NotifyActorResumedFromCheckpointReply {
}

message SetResourceRequest {
bytes worker_id = 1;
// Name of the resource to be set.
bytes resource_name = 1;
bytes resource_name = 2;
// Capacity of the resource to be set.
double capacity = 2;
double capacity = 3;
// Client ID where this resource will be set.
bytes client_id = 3;
bytes client_id = 4;
}
message SetResourceReply {
}
Expand Down
Loading