-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[Core worker] Serialize ActorHandle in core worker. Make ActorHandle thread safe. #5034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core worker] Serialize ActorHandle in core worker. Make ActorHandle thread safe. #5034
Conversation
Test FAILed. |
src/ray/core_worker/task_interface.h
Outdated
ObjectID actor_cursor_; | ||
/// Counter for tasks from this handle. | ||
/// The number of tasks that have been invoked on this actor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment that task_counter, num_forks, new_actor_handles are guarded by the mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/core_worker/task_interface.h
Outdated
actor_id_, ComputeNextActorHandleId(actor_handle_id_, ++num_forks_), | ||
actor_language_, actor_definition_descriptor_)); | ||
new_handle->actor_cursor_ = actor_cursor_; | ||
new_actor_handles_.push_back(new_handle->actor_handle_id_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about the lifecycle of the new actor handles. Why are the forked handles kept in a list until the next actor task before they are cleared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulchen Can you give a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used for garbage-collecting the dummy objects. Because we need to know wether the current dummy object is depended by others. @stephanie-wang implemented this in #3593
src/ray/protobuf/core_worker.proto
Outdated
bytes actor_cursor = 5; | ||
|
||
// The number of tasks that have been invoked on this actor. | ||
int32 task_counter = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx!
src/ray/protobuf/core_worker.proto
Outdated
|
||
// The number of times that this actor handle has been forked. | ||
// It's used to make sure ids of actor handles are unique. | ||
int32 num_forks = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx!
Test FAILed. |
break; | ||
case WorkerLanguage::JAVA: | ||
RAY_CHECK(function.function_descriptor.size() == 3); | ||
actor_definition_descriptor.push_back(function.function_descriptor[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay to just save actor creation task's function descriptor in the handle. Then we don't need to introduce this new concept ActorDefinitionDescriptor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to actor_creation_task_function_descriptor
src/ray/core_worker/task_interface.h
Outdated
temp.set_actor_cursor(actor_cursor_.Binary()); | ||
temp.set_task_counter(task_counter_); | ||
temp.set_num_forks(num_forks_); | ||
temp.SerializeToString(output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temp.SerializeToString(output);
doesn't need to be locked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new code wrapped ray::rpc::ActorHandle
in ray::ActorHandle
. The comment doesn't apply any more.
src/ray/core_worker/task_interface.h
Outdated
ray::ActorHandleID::FromBinary(temp.actor_handle_id()), | ||
(WorkerLanguage)temp.actor_language(), actor_definition_descriptor, | ||
ray::ObjectID::FromBinary(temp.actor_cursor()), temp.task_counter(), | ||
temp.num_forks())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's easier to just make ActorHandle
hold a rpc::ActorHandle
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test PASSed. |
Test PASSed. |
src/ray/common/id.cc
Outdated
sha256_init(&ctx); | ||
sha256_update(&ctx, reinterpret_cast<const BYTE *>(actor_handle_id.Data()), | ||
actor_handle_id.Size()); | ||
sha256_update(&ctx, (const BYTE *)&num_forks, sizeof(num_forks)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use safety cast ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
@@ -129,6 +129,21 @@ const TaskID GenerateTaskId(const DriverID &driver_id, const TaskID &parent_task | |||
return TaskID::FromBinary(std::string(buff, buff + TaskID::Size())); | |||
} | |||
|
|||
const ActorHandleID ComputeNextActorHandleId(const ActorHandleID &actor_handle_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about putting this method to a separated file id_util.h
to keep id.h
clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just followed GenerateTaskId
. Maybe move it later?
src/ray/core_worker/common.h
Outdated
return ::Language::CPP; | ||
break; | ||
default: | ||
RAY_LOG(FATAL) << "invalid language specified: " << static_cast<int>(language); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invalid -> Invalid
We should log the meaningful name instead of an int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't get a meaningful name if the value is out of valid range of enum. e.g. static_cast<ray::rpc::Language>(100)
. And eventually, the old, flatbuffer generated definition of language will be replaced by protobuf generated one. So these two conversion functions will go away in the near future.
src/ray/core_worker/core_worker.h
Outdated
/// Type of this worker. | ||
const enum WorkerType worker_type_; | ||
|
||
/// Language of this worker. | ||
const enum WorkerLanguage language_; | ||
const enum ray::rpc::Language language_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the enum
keyword.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
ASSERT_NE(handle1.ActorHandleID(), forkedHandle1->ActorHandleID()); | ||
ASSERT_EQ(handle1.ActorLanguage(), forkedHandle1->ActorLanguage()); | ||
RAY_LOG(INFO) << handle1.ActorCreationTaskFunctionDescriptor().size(); | ||
RAY_LOG(INFO) << forkedHandle1->ActorCreationTaskFunctionDescriptor().size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these logs useful? I guess they could be removed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad.
src/ray/core_worker/task_interface.h
Outdated
auto &original = inner_.actor_creation_task_function_descriptor(); | ||
*new_handle->inner_.mutable_actor_creation_task_function_descriptor() = { | ||
original.begin(), original.end()}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove some blank lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
Test PASSed. |
src/ray/core_worker/task_interface.h
Outdated
const int64_t NumForks() const { return inner_.num_forks(); }; | ||
|
||
std::unique_ptr<ActorHandle> Fork() { | ||
auto new_handle = std::unique_ptr<ActorHandle>(new ActorHandle()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use new_handle.inner_.CopyFrom(inner_)
, and then only modify the changed fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to new_handle.inner_ = inner_
.
src/ray/core_worker/task_interface.h
Outdated
/// It's used to make sure ids of actor handles are unique. | ||
const int64_t NumForks() const { return inner_.num_forks(); }; | ||
|
||
std::unique_ptr<ActorHandle> Fork() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this return a unique_ptr
, instead of just an object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/core_worker/task_interface.h
Outdated
/// It's used to make sure ids of actor handles are unique. | ||
const int64_t NumForks() const { return inner_.num_forks(); }; | ||
|
||
std::unique_ptr<ActorHandle> Fork() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just curious about the reason for not putting the definition of this method to .cc
file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/core_worker/common.h
Outdated
/// | ||
/// \param[in] language Language for a task. | ||
/// \return Translated task language. | ||
inline ::Language ToTaskLanguage(ray::rpc::Language language) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the same name for both of these functions might make it harder to read. For clarity, it would be helpful to specify the "direction" of the translation in the function name, like:
::Language ToCoreTaskLanguage(ray::rpc::Language language)
and for the one below:
ray::rpc::Language ToRPCTaskLanguage(::Language language)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
@@ -29,7 +29,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { | |||
const auto &spec = task.GetTaskSpecification(); | |||
core_worker_.worker_context_.SetCurrentTask(spec); | |||
|
|||
ray::rpc::Language language = ToTaskLanguage(spec.GetLanguage()); | |||
ray::rpc::Language language = ToRpcTaskLanguage(spec.GetLanguage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: RPC
not Rpc
Unless there's a convention of doing pure camel case even for initialisms in Ray - I'm not sure!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both naming conventions are being used widely. I googled this a while ago, a more usual practice is to use all upper cases if the the acronym is less than 2 letters (e.g., IO
), and use camel case otherwise (e.g., Rpc
).
Note, currently we use TaskID
in c++ and Python, and TaskId
in Java. That is because all Java linters prefer camel case regardless of word length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just took a look. Google's C++ style guide suggest Rpc
.
For some symbols, this style guide recommends names to start with a capital letter and to have a capital letter for each new word (a.k.a. "Camel Case" or "Pascal case"). When abbreviations or acronyms appear in such names, prefer to capitalize the abbreviations or acronyms as single words (i.e StartRpc(), not StartRPC()).
Test PASSed. |
@kfstorm Could you resolve the conflicts? |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I think this can be merged after addressing these minor comments.
Test FAILed. |
Linting failed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
What do these changes do?
Previously actor handle serialization is implemented in both Java and Python. Now we want to do it in core worker. And in the future when submitting a task, we can pass an actor handle of any language as a task argument to any worker, regardless the language of the worker.
This PR also added a mutex for ActorHandle to ensure critical paths thread safe.
Related issue number
#4850
Linter
scripts/format.sh
to lint the changes in this PR.