-
-
Notifications
You must be signed in to change notification settings - Fork 5.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][Distributed] add shm broadcast #5399
Merged
Merged
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit
Hold shift + click to select a range
c82dc08
add shm broadcast
youkaichao aa15dc2
fix name
youkaichao 7d4f81e
Merge branch 'main' into shm_broadcast
youkaichao b4e5d47
enable shm broadcast
youkaichao aa606d1
use HIGHEST_PROTOCOL
youkaichao 943147c
add modulus
youkaichao 2f2e7b8
error on large object
youkaichao 399f8c1
name written flag
youkaichao a7db4d5
rename to data and metadata
youkaichao a105688
add sleep if all blocks are empty
youkaichao d09c8b6
bump up slots
youkaichao ba6839d
only memset for metadata section
youkaichao a298ae9
add comments
youkaichao 2c775d0
remove initialization in world size 1
youkaichao b8105bb
add comments
youkaichao 681919a
add warning if waiting for too long
youkaichao 468bf93
add shm broadcast tests
youkaichao c5e47b3
lint
youkaichao 98188ce
add tests
youkaichao 8e755f2
Update vllm/distributed/device_communicators/shm_broadcast.py
youkaichao 5197920
Merge branch 'main' into shm_broadcast
youkaichao 398c6e2
Merge branch 'main' into shm_broadcast
youkaichao 57a1839
Merge branch 'main' into shm_broadcast
youkaichao 95b8a87
use underscore for private attributes
youkaichao c0cc37f
rename
youkaichao fc49f86
add mem layout docstring
youkaichao 34475a0
stash
youkaichao 82792f1
Merge branch 'main' into shm_broadcast
youkaichao 4b70d6f
refactor
youkaichao bb851d4
use queue
youkaichao f7680f4
fix lint
youkaichao 9af386c
add single process test
youkaichao 729a592
fix warning
youkaichao 0a61a69
add barrier
youkaichao d8d9a0f
add test for complicated cases
youkaichao 608d57f
fix tests
youkaichao e5137cb
fix tests
youkaichao d0aa190
add comments
youkaichao d0522b0
fix race condition
youkaichao cd39b81
add random delay in test
youkaichao d0f77e9
add comments
youkaichao 5fd104e
use env var
youkaichao 0e3a810
fix env
youkaichao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add shm broadcast tests
- Loading branch information
commit 468bf93b6fc073fc41082220ce567e4e77047d70
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from vllm.distributed.device_communicators.shm_broadcast import ShmRingBuffer | ||
import torch.distributed as dist | ||
import multiprocessing | ||
|
||
def distributed_run(fn, world_size): | ||
number_of_processes = world_size | ||
processes = [] | ||
for i in range(number_of_processes): | ||
env = {} | ||
env['RANK'] = str(i) | ||
env['LOCAL_RANK'] = str(i) | ||
env['WORLD_SIZE'] = str(number_of_processes) | ||
env['LOCAL_WORLD_SIZE'] = str(number_of_processes) | ||
env['MASTER_ADDR'] = 'localhost' | ||
env['MASTER_PORT'] = '12345' | ||
p = multiprocessing.Process(target=fn, args=(env, )) | ||
processes.append(p) | ||
p.start() | ||
|
||
for p in processes: | ||
p.join() | ||
|
||
for p in processes: | ||
assert p.exitcode == 0 | ||
|
||
def worker_fn_wrapper(fn): | ||
# `multiprocessing.Process` cannot accept environment variables directly | ||
# so we need to pass the environment variables as arguments | ||
# and update the environment variables in the function | ||
def wrapped_fn(env): | ||
update_environment_variables(env) | ||
dist.init_process_group(backend="gloo") | ||
fn() | ||
|
||
return wrapped_fn | ||
|
||
@worker_fn_wrapper | ||
def worker_fn(): | ||
broadcaster = ShmRingBuffer(dist.group.WORLD, 1024, 1) | ||
if dist.get_rank() == 0: | ||
broadcaster.broadcast_object(0) | ||
broadcaster.broadcast_object(1) | ||
else: | ||
a = broadcaster.broadcast_object(None) | ||
b = broadcaster.broadcast_object(None) | ||
assert a == 0 | ||
assert b == 1 | ||
|
||
def test_shm_broadcast(): | ||
distributed_run(worker_fn, 4) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this create side effects to rest of the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is local to the process created in this test.