-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Update Error Message and Anti-Pattern for the Case of Forking New Processes in Worker Processes #50705
Changes from 9 commits
d9db3be
34e33b8
741f6b4
3ba379d
19487f7
697dcae
7cb0dab
3843902
7c50d6d
d6005f5
f7162b5
c592821
1d6352c
562ed14
d338f31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Raylet |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import ray | ||
from concurrent.futures import ProcessPoolExecutor, as_completed | ||
import multiprocessing | ||
|
||
|
||
@ray.remote | ||
def generate_response(request): | ||
return "Response to " + request | ||
|
||
|
||
def process_response(response): | ||
print(response) | ||
return "Processed " + response | ||
|
||
|
||
def main(): | ||
ray.init() | ||
responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)]) | ||
|
||
# Better approach: Set the start method to "spawn" | ||
multiprocessing.set_start_method("spawn", force=True) | ||
|
||
with ProcessPoolExecutor(max_workers=4) as executor: | ||
future_to_task = {} | ||
for idx, response in enumerate(responses): | ||
future_to_task[executor.submit(process_response, response)] = idx | ||
|
||
for future in as_completed(future_to_task): | ||
idx = future_to_task[future] | ||
response_entry = future.result() | ||
print(f"Response {idx} processed: {response_entry}") | ||
|
||
ray.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,24 @@ | ||||||
Anti-pattern: Forking new Processes in Application Code | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline. "Application Code" can include drivers, tasks, actors that inside the ray context. So will keep the "Application Code" in the subject and clarify it in the doc. |
||||||
======================================================== | ||||||
|
||||||
**Summary:** Don't fork new processes in application code. Instead, use "spawn" method | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
to start new processes or use Ray tasks and actors to parallelize your workload | ||||||
|
||||||
Ray manages the lifecycle of processes for you. Ray Objects, Tasks, and | ||||||
Actors manages sockets to communicate with the Raylet and the GCS. If you fork new | ||||||
processes in your application code, the processes could share the same sockets without | ||||||
any synchronization. This can lead to corrupted message and unexpected | ||||||
behavior. | ||||||
|
||||||
The solution is to: | ||||||
1. use "spawn" method to start new processes so that parent process's | ||||||
memory space isn't copied to the child processes or | ||||||
2. use Ray tasks and | ||||||
actors to parallelize your workload and let Ray to manage the lifecycle of the | ||||||
processes for you. | ||||||
|
||||||
Code example | ||||||
------------ | ||||||
.. literalinclude:: ../doc_code/anti_pattern_create_new_processes.py | ||||||
:language: python | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -27,3 +27,4 @@ This section is a collection of common design patterns and anti-patterns for wri | |||||
closure-capture-large-objects | ||||||
global-variables | ||||||
out-of-band-object-ref-serialization | ||||||
create-new-processes | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -46,6 +46,14 @@ enum class ObjectState : int { | |||||
PLASMA_SEALED = 2, | ||||||
}; | ||||||
|
||||||
inline constexpr std::string_view kCorruptedRequestErrorMessage = | ||||||
"This could be due to " | ||||||
"process forking in core worker or driver code which results in multiple processes " | ||||||
"sharing the same Plasma store socket. Please ensure that there are no " | ||||||
"process forking in any of the application core worker or driver code. Follow the " | ||||||
"link here to learn more about the issue and how to fix it: " | ||||||
"https://docs.ray.io/en/latest/ray-core/patterns/create-new-processes.html"; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
// Represents a chunk of allocated memory. | ||||||
struct Allocation { | ||||||
/// Pointer to the allocated memory. | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return big numpy array