Skip to content

Conversation

kfstorm
Copy link
Member

@kfstorm kfstorm commented Jun 25, 2019

What do these changes do?

Previously actor handle serialization is implemented in both Java and Python. Now we want to do it in core worker. And in the future when submitting a task, we can pass an actor handle of any language as a task argument to any worker, regardless the language of the worker.

This PR also added a mutex for ActorHandle to ensure critical paths thread safe.

Related issue number

#4850

Linter

  • I've run scripts/format.sh to lint the changes in this PR.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14863/
Test FAILed.

@ericl ericl self-assigned this Jun 25, 2019
ObjectID actor_cursor_;
/// Counter for tasks from this handle.
/// The number of tasks that have been invoked on this actor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment that task_counter, num_forks, new_actor_handles are guarded by the mutex?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

actor_id_, ComputeNextActorHandleId(actor_handle_id_, ++num_forks_),
actor_language_, actor_definition_descriptor_));
new_handle->actor_cursor_ = actor_cursor_;
new_actor_handles_.push_back(new_handle->actor_handle_id_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about the lifecycle of the new actor handles. Why are the forked handles kept in a list until the next actor task before they are cleared?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulchen Can you give a comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used for garbage-collecting the dummy objects. Because we need to know wether the current dummy object is depended by others. @stephanie-wang implemented this in #3593

bytes actor_cursor = 5;

// The number of tasks that have been invoked on this actor.
int32 task_counter = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int64?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx!


// The number of times that this actor handle has been forked.
// It's used to make sure ids of actor handles are unique.
int32 num_forks = 7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int64?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx!

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14887/
Test FAILed.

break;
case WorkerLanguage::JAVA:
RAY_CHECK(function.function_descriptor.size() == 3);
actor_definition_descriptor.push_back(function.function_descriptor[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to just save actor creation task's function descriptor in the handle. Then we don't need to introduce this new concept ActorDefinitionDescriptor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to actor_creation_task_function_descriptor

temp.set_actor_cursor(actor_cursor_.Binary());
temp.set_task_counter(task_counter_);
temp.set_num_forks(num_forks_);
temp.SerializeToString(output);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp.SerializeToString(output); doesn't need to be locked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new code wrapped ray::rpc::ActorHandle in ray::ActorHandle. The comment doesn't apply any more.

ray::ActorHandleID::FromBinary(temp.actor_handle_id()),
(WorkerLanguage)temp.actor_language(), actor_definition_descriptor,
ray::ObjectID::FromBinary(temp.actor_cursor()), temp.task_counter(),
temp.num_forks()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's easier to just make ActorHandle hold a rpc::ActorHandle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14888/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14893/
Test PASSed.

sha256_init(&ctx);
sha256_update(&ctx, reinterpret_cast<const BYTE *>(actor_handle_id.Data()),
actor_handle_id.Size());
sha256_update(&ctx, (const BYTE *)&num_forks, sizeof(num_forks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use safety cast ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@@ -129,6 +129,21 @@ const TaskID GenerateTaskId(const DriverID &driver_id, const TaskID &parent_task
return TaskID::FromBinary(std::string(buff, buff + TaskID::Size()));
}

const ActorHandleID ComputeNextActorHandleId(const ActorHandleID &actor_handle_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting this method to a separated file id_util.h to keep id.h clean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed GenerateTaskId. Maybe move it later?

return ::Language::CPP;
break;
default:
RAY_LOG(FATAL) << "invalid language specified: " << static_cast<int>(language);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid -> Invalid

We should log the meaningful name instead of an int?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't get a meaningful name if the value is out of valid range of enum. e.g. static_cast<ray::rpc::Language>(100). And eventually, the old, flatbuffer generated definition of language will be replaced by protobuf generated one. So these two conversion functions will go away in the near future.

/// Type of this worker.
const enum WorkerType worker_type_;

/// Language of this worker.
const enum WorkerLanguage language_;
const enum ray::rpc::Language language_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the enum keyword.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ASSERT_NE(handle1.ActorHandleID(), forkedHandle1->ActorHandleID());
ASSERT_EQ(handle1.ActorLanguage(), forkedHandle1->ActorLanguage());
RAY_LOG(INFO) << handle1.ActorCreationTaskFunctionDescriptor().size();
RAY_LOG(INFO) << forkedHandle1->ActorCreationTaskFunctionDescriptor().size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these logs useful? I guess they could be removed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.

auto &original = inner_.actor_creation_task_function_descriptor();
*new_handle->inner_.mutable_actor_creation_task_function_descriptor() = {
original.begin(), original.end()};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove some blank lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14896/
Test PASSed.

const int64_t NumForks() const { return inner_.num_forks(); };

std::unique_ptr<ActorHandle> Fork() {
auto new_handle = std::unique_ptr<ActorHandle>(new ActorHandle());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use new_handle.inner_.CopyFrom(inner_), and then only modify the changed fields

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to new_handle.inner_ = inner_.

/// It's used to make sure ids of actor handles are unique.
const int64_t NumForks() const { return inner_.num_forks(); };

std::unique_ptr<ActorHandle> Fork() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this return a unique_ptr, instead of just an object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/// It's used to make sure ids of actor handles are unique.
const int64_t NumForks() const { return inner_.num_forks(); };

std::unique_ptr<ActorHandle> Fork() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious about the reason for not putting the definition of this method to .cc file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

///
/// \param[in] language Language for a task.
/// \return Translated task language.
inline ::Language ToTaskLanguage(ray::rpc::Language language) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the same name for both of these functions might make it harder to read. For clarity, it would be helpful to specify the "direction" of the translation in the function name, like:
::Language ToCoreTaskLanguage(ray::rpc::Language language)
and for the one below:
ray::rpc::Language ToRPCTaskLanguage(::Language language)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14922/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14926/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14929/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14930/
Test FAILed.

@@ -29,7 +29,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
const auto &spec = task.GetTaskSpecification();
core_worker_.worker_context_.SetCurrentTask(spec);

ray::rpc::Language language = ToTaskLanguage(spec.GetLanguage());
ray::rpc::Language language = ToRpcTaskLanguage(spec.GetLanguage());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: RPC not Rpc
Unless there's a convention of doing pure camel case even for initialisms in Ray - I'm not sure!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both naming conventions are being used widely. I googled this a while ago, a more usual practice is to use all upper cases if the the acronym is less than 2 letters (e.g., IO), and use camel case otherwise (e.g., Rpc).
Note, currently we use TaskID in c++ and Python, and TaskId in Java. That is because all Java linters prefer camel case regardless of word length.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just took a look. Google's C++ style guide suggest Rpc.

For some symbols, this style guide recommends names to start with a capital letter and to have a capital letter for each new word (a.k.a. "Camel Case" or "Pascal case"). When abbreviations or acronyms appear in such names, prefer to capitalize the abbreviations or acronyms as single words (i.e StartRpc(), not StartRPC()).

https://google.github.io/styleguide/cppguide.html#Naming

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14935/
Test PASSed.

@raulchen
Copy link
Contributor

@kfstorm Could you resolve the conflicts?

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14949/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1391/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1394/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14964/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1395/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1396/
Test FAILed.

Copy link
Contributor

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think this can be merged after addressing these minor comments.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14992/
Test FAILed.

@jovany-wang
Copy link
Contributor

Linting failed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1417/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/15001/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1423/
Test FAILed.

@jovany-wang jovany-wang merged commit 1cf7728 into ray-project:master Jul 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants