Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Update Error Message and Anti-Pattern for the Case of Forking New Processes in Worker Processes #50705

Merged
merged 15 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vale.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
StylesPath = .vale/styles

Vocab = General, Data, RLlib, Train
Vocab = General, Core, Data, RLlib, Train

MinAlertLevel = suggestion

Expand Down
1 change: 1 addition & 0 deletions .vale/styles/config/vocabularies/Core/accept.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Raylet
37 changes: 37 additions & 0 deletions doc/source/ray-core/doc_code/anti_pattern_create_new_processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import ray
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing


@ray.remote
def generate_response(request):
return "Response to " + request
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return big numpy array



def process_response(response):
print(response)
return "Processed " + response


def main():
ray.init()
responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)])

# Better approach: Set the start method to "spawn"
multiprocessing.set_start_method("spawn", force=True)

with ProcessPoolExecutor(max_workers=4) as executor:
future_to_task = {}
for idx, response in enumerate(responses):
future_to_task[executor.submit(process_response, response)] = idx

for future in as_completed(future_to_task):
idx = future_to_task[future]
response_entry = future.result()
print(f"Response {idx} processed: {response_entry}")

ray.shutdown()


if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions doc/source/ray-core/patterns/create-new-processes.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Anti-pattern: Forking new Processes in Application Code
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Anti-pattern: Forking new Processes in Application Code
Anti-pattern: Forking new processes in tasks or actors

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. "Application Code" can include drivers, tasks, actors that inside the ray context. So will keep the "Application Code" in the subject and clarify it in the doc.

========================================================

**Summary:** Don't fork new processes in application code. Instead, use "spawn" method
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
**Summary:** Don't fork new processes in application code. Instead, use "spawn" method
**Summary:** Don't fork new processes in tasks or actors. Instead, use "spawn" method

to start new processes or use Ray tasks and actors to parallelize your workload

Ray manages the lifecycle of processes for you. Ray Objects, Tasks, and
Actors manages sockets to communicate with the Raylet and the GCS. If you fork new
processes in your application code, the processes could share the same sockets without
any synchronization. This can lead to corrupted message and unexpected
behavior.

The solution is to:
1. use "spawn" method to start new processes so that parent process's
memory space isn't copied to the child processes or
2. use Ray tasks and
actors to parallelize your workload and let Ray to manage the lifecycle of the
processes for you.

Code example
------------
.. literalinclude:: ../doc_code/anti_pattern_create_new_processes.py
:language: python

1 change: 1 addition & 0 deletions doc/source/ray-core/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ This section is a collection of common design patterns and anti-patterns for wri
closure-capture-large-objects
global-variables
out-of-band-object-ref-serialization
create-new-processes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
create-new-processes
fork-new-processes

8 changes: 8 additions & 0 deletions src/ray/object_manager/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ enum class ObjectState : int {
PLASMA_SEALED = 2,
};

inline constexpr std::string_view kCorruptedRequestErrorMessage =
"This could be due to "
"process forking in core worker or driver code which results in multiple processes "
"sharing the same Plasma store socket. Please ensure that there are no "
"process forking in any of the application core worker or driver code. Follow the "
"link here to learn more about the issue and how to fix it: "
"https://docs.ray.io/en/latest/ray-core/patterns/create-new-processes.html";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"https://docs.ray.io/en/latest/ray-core/patterns/create-new-processes.html";
"https://docs.ray.io/en/latest/ray-core/patterns/fork-new-processes.html";


// Represents a chunk of allocated memory.
struct Allocation {
/// Pointer to the allocated memory.
Expand Down
26 changes: 26 additions & 0 deletions src/ray/object_manager/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ using fb::PlasmaObjectSpec;

using flatbuffers::uoffset_t;

inline constexpr std::string_view kDebugString = "debug_string";
inline constexpr std::string_view kObjectId = "object_id";
inline constexpr std::string_view kObjectIds = "object_ids";
inline constexpr std::string_view kOwnerRayletId = "owner_raylet_id";
inline constexpr std::string_view kOwnerIpAddress = "owner_ip_address";
inline constexpr std::string_view kOnwerWorkerId = "owner_worker_id";

namespace internal {

static uint8_t non_null_filler;
Expand Down Expand Up @@ -182,6 +189,8 @@ Status ReadGetDebugStringReply(uint8_t *data, size_t size, std::string *debug_st
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetDebugStringReply>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(
message->debug_string(), kDebugString, MessageType::PlasmaGetDebugStringReply);
*debug_string = message->debug_string()->str();
return Status::OK();
}
Expand Down Expand Up @@ -234,10 +243,17 @@ void ReadCreateRequest(uint8_t *data,
object_info->is_mutable = message->is_mutable();
object_info->data_size = message->data_size();
object_info->metadata_size = message->metadata_size();
VerifyNotNullPtr(message->object_id(), kObjectId, MessageType::PlasmaCreateRequest);
object_info->object_id = ObjectID::FromBinary(message->object_id()->str());
VerifyNotNullPtr(
message->owner_raylet_id(), kOwnerRayletId, MessageType::PlasmaCreateRequest);
object_info->owner_raylet_id = NodeID::FromBinary(message->owner_raylet_id()->str());
VerifyNotNullPtr(
message->owner_ip_address(), kOwnerIpAddress, MessageType::PlasmaCreateRequest);
object_info->owner_ip_address = message->owner_ip_address()->str();
object_info->owner_port = message->owner_port();
VerifyNotNullPtr(
message->owner_worker_id(), kOnwerWorkerId, MessageType::PlasmaCreateRequest);
object_info->owner_worker_id = WorkerID::FromBinary(message->owner_worker_id()->str());
*source = message->source();
*device_num = message->device_num();
Expand Down Expand Up @@ -336,6 +352,7 @@ Status ReadAbortRequest(uint8_t *data, size_t size, ObjectID *object_id) {
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(message->object_id(), kObjectId, MessageType::PlasmaAbortRequest);
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
Expand Down Expand Up @@ -366,6 +383,7 @@ Status ReadSealRequest(uint8_t *data, size_t size, ObjectID *object_id) {
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(message->object_id(), kObjectId, MessageType::PlasmaSealRequest);
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
Expand Down Expand Up @@ -405,6 +423,7 @@ Status ReadReleaseRequest(uint8_t *data,
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(message->object_id(), kObjectId, MessageType::PlasmaReleaseRequest);
*object_id = ObjectID::FromBinary(message->object_id()->str());
*may_unmap = message->may_unmap();
return Status::OK();
Expand Down Expand Up @@ -451,7 +470,10 @@ Status ReadDeleteRequest(uint8_t *data, size_t size, std::vector<ObjectID> *obje
RAY_DCHECK(object_ids);
auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(message->object_ids(), kObjectIds, MessageType::PlasmaDeleteRequest);
ToVector(*message, object_ids, [](const PlasmaDeleteRequest &request, int i) {
VerifyNotNullPtr(
request.object_ids()->Get(i), kObjectId, MessageType::PlasmaDeleteRequest);
return ObjectID::FromBinary(request.object_ids()->Get(i)->str());
});
return Status::OK();
Expand Down Expand Up @@ -505,6 +527,7 @@ Status ReadContainsRequest(uint8_t *data, size_t size, ObjectID *object_id) {
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(message->object_id(), kObjectId, MessageType::PlasmaContainsRequest);
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
Expand Down Expand Up @@ -605,7 +628,10 @@ Status ReadGetRequest(uint8_t *data,
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
VerifyNotNullPtr(message->object_ids(), kObjectIds, MessageType::PlasmaGetRequest);
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
VerifyNotNullPtr(
message->object_ids()->Get(i), kObjectId, MessageType::PlasmaGetRequest);
auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::FromBinary(object_id));
}
Expand Down
7 changes: 7 additions & 0 deletions src/ray/object_manager/plasma/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ bool VerifyFlatbuffer(T *object, uint8_t *data, size_t size) {
return object->Verify(verifier);
}

template <class T>
void VerifyNotNullPtr(T *object, std::string_view obj_name, MessageType msg_type) {
RAY_CHECK(object != nullptr) << "Corrupted " << EnumNameMessageType(msg_type)
<< " message: " << obj_name << " is null. "
<< kCorruptedRequestErrorMessage;
}

flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
ToFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
const ObjectID *object_ids,
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
} break;
default:
// This code should be unreachable.
RAY_LOG(FATAL) << "Invalid Plasma message type. type=" << static_cast<long>(type)
<< ". " << kCorruptedRequestErrorMessage;
RAY_CHECK(0);
}
return Status::OK();
Expand Down