-
-
Notifications
You must be signed in to change notification settings - Fork 8.4k
[V1][P/D] An native implementation of xPyD based on P2P NCCL #18242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+1,780
−0
Merged
Changes from all commits
Commits
Show all changes
173 commits
Select commit
Hold shift + click to select a range
0a60364
runnable
Abatom 016d004
format
Abatom 448bed9
format
Abatom 825fe06
format
Abatom 477fe2b
format
Abatom dd6dcf9
pass
Abatom 7eb1575
format
Abatom a0d37bb
format
Abatom da335ea
move some args to kv_connector_extra_config
Abatom 178ca2f
format
Abatom f03ac47
remove some code comments
Abatom 603a355
Replace pickle with msgpack
Abatom 2acb321
fix bug
Abatom b957dd7
Out Of Memory
Abatom f6407b3
_send_sync
Abatom 5c165d9
add p2p_nccl_connector.py based on V1
Abatom 1e9fab6
add shape and size log for recv_tensor
Abatom 33601e2
fix hang & oom
Abatom 5a888fc
format
Abatom d3e7194
Merge branch 'main' into xpyd
Abatom bddc4e1
ping thread
Abatom e516a70
add code comments.
Abatom 8dadfb4
GET
Abatom 659ead7
format
Abatom fd596dc
send_queue
Abatom fe81aae
modify log
Abatom bb5d23f
Merge branch 'main' into xpyd
Abatom 21818fe
fix bug for PUT_ASYNC
Abatom dcb637b
modify log
Abatom 5cbb299
format
Abatom 976f51b
format
Abatom a2470ac
fix bug
Abatom b0facb9
format
Abatom 20f2e7a
fix bug
Abatom 8fd3eca
Merge branch 'main' into xpyd
Abatom f1a5183
fix bug
Abatom 1c74857
format
Abatom 41b0ae6
rm popitem
Abatom ec364e5
rm disagg_prefill_xpyd.sh
Abatom 06281db
Merge branch 'main' into xpyd
Abatom ed2fbb6
V1
Abatom 616ce48
bugfix and format
Abatom 49336a2
bugfix
Abatom 3d8d7b6
bugfix
Abatom 0b9a2ac
format
Abatom e3f858f
add rank and local_rank
Abatom 6e088e6
Merge branch 'main' into xpyd
Abatom e8b8f36
bugfix
Abatom 8623e3c
runnable for V1
Abatom d11388a
format
Abatom 17e9905
rm valid_num_tokens
Abatom e13094b
wait_for_save
Abatom d96ecc3
inject_kv_into_layer
Abatom eaaf50c
get_num_new_matched_tokens
Abatom 6a2af6c
make_meta
Abatom 7d0f562
format
Abatom ca9724a
add send_stream and recv_stream
Abatom 738c14c
Merge branch 'main' into xpyd
Abatom 8d41359
Each NCCL connects to a stream.
Abatom 1ad9579
bugfix for GET and revert Each NCCL connects to a stream.
Abatom 13fa8b6
add mem pool
Abatom d715d6b
improve mempool
Abatom 3259540
bugfix
Abatom f525001
torch.cuda.Event
Abatom 1c98a49
Merge branch 'main' into xpyd
Abatom ac810f3
Merge branch 'xpyd' into xpyd-mempool
Abatom 28ef7cc
load_stream.synchronize and store_stream.synchronize
Abatom fab1d33
stream.synchronize
Abatom 2400d0b
build_connector_meta
Abatom 6eab9df
bugfix
Abatom acfa2ac
bugfix
Abatom bb596f7
proxy add round robin
Abatom 7b710f4
mem_pool_size
Abatom 942e1d5
add tensor_mem_pool
Abatom 49387ff
rm v0
Abatom ac944be
Merge branch 'main' into xpyd-v1
Abatom 80e26b9
rename
Abatom a9c5674
format
Abatom 3c67c9e
bugfix
Abatom b6b52d2
format
Abatom 540ab4e
format
Abatom a0b23a6
format
Abatom 92c2ab8
format
Abatom 6ffde23
Merge branch 'main' into xpyd-v1
Abatom 25d0557
Merge branch 'main' into xpyd-v1
Abatom ccba1d6
get_num_new_matched_tokens
Abatom 2840981
bugfix for undefined symbol
Abatom db544da
store_tensor and load_tensor
Abatom bb3b617
add !
Abatom 9665cd2
add tensor_store_load_mem.cu to CMakeLists.txt
Abatom c792a55
bugfix for CMakeLists.txt
Abatom d02ecab
bugfix for cmake
Abatom d33a404
#include <Python.h>
Abatom 2c49965
fix make error
Abatom ed94529
fix make error
Abatom b9ac677
format
Abatom 684d070
bugfix & format
Abatom 152b140
float(self.config.kv_buffer_size)
Abatom 1ba5228
add log
Abatom 65e5f43
format
Abatom 28aa693
change - to #
Abatom 4661dc5
Merge branch 'main' into xpyd-v1
Abatom 8512639
add get_finished & request_finished
Abatom b7d7375
format
Abatom ad815ac
bugfix for chunked prefill
Abatom 1db3729
format
Abatom 1caf937
format & debug log
Abatom 0faff05
Merge branch 'main' into xpyd-v1
Abatom b1937e9
bugfix for
Abatom fa2f130
1/N preemption
Abatom 4f79476
Clear the buffer upon request completion(1/N)
Abatom e41d815
Clear the buffer upon request completion(2/N)
Abatom eefd267
get_finished
Abatom 1a0bcba
bugfix for get_finished
Abatom f5876c8
bugfix for get_finished and format
Abatom 1d37166
format
Abatom 1b29352
format
Abatom 43ed2ad
Support release KV Cache after sending is completed
Abatom 38ae6c3
Clear the buffer upon request completion(3/N)
Abatom bcd3dd9
Fix the issue of inaccurate results caused by preemption
Abatom 4b15e27
format
Abatom bcc608f
format
Abatom 5c96490
format
Abatom af150dd
format
Abatom a7ccdca
bugfix for KVCacheManager.get_block_ids
Abatom 4707612
get_block_ids
Abatom 066b347
use copy_
Abatom e384e7d
format
Abatom 5cc9686
remove tensor_store_load_mem.cu
Abatom 6825ed7
remove tensor_store_load_mem.cu
Abatom a805c42
format
Abatom afa7552
Misc
Abatom 47ee59a
Misc
Abatom 135e906
format
Abatom dcc36fb
Merge branch 'main' into xpyd-v1
Abatom 8af556a
get_finished
Abatom 5c1c552
get_finished
Abatom 18d40b1
use get_world_group().local_rank
Abatom 3003a10
modify log
Abatom c71b97f
bugfix
Abatom 5e20f3e
Merge pull request #8 from Abatom/xpyd-v1-rank
Abatom f6460c3
SharedStorageConnector.__init__
Abatom bc5872b
get_forward_context()
Abatom 4e36203
forward_context
Abatom 631445a
Merge branch 'main' into xpyd-v1
Abatom 92cffaf
Merge branch 'vllm-project:main' into xpyd-v1
Abatom bbf257c
clean code
Abatom 6b5547d
format
Abatom 37c5374
set_p2p_nccl_context
Abatom 1a2ffe5
mem_pool_size_gb(32)
Abatom b060014
Merge branch 'main' into xpyd-v1
Abatom 17a83f9
nccl_num_chennels=8
Abatom b4416a4
MemoryError->ValueError
Abatom a3e7337
log level
Abatom ad7f14a
TensorMemoryPool
Abatom 3c30c02
format
Abatom babdaa2
format
Abatom fea91ab
add the docstring for TensorMemoryPool
Abatom a33e149
format
Abatom c8558d3
num_chennels->num_channels
Abatom 58eeac7
format
Abatom d063d89
Only symmetric TP is supported
Abatom 5118c06
format
Abatom fd38521
add doc
Abatom 4fd68a9
doc
Abatom 95bd8c8
clean up lots of logs
Abatom b96369f
log level
Abatom 938dbdb
update md
Abatom 2dea3d0
doc
Abatom c7210ea
Add Detailed Design
Abatom cad61b8
Add Test data
Abatom 4fd955a
format
Abatom 9d52eea
format
Abatom File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
154 changes: 154 additions & 0 deletions
154
examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import os | ||
import socket | ||
import threading | ||
import uuid | ||
|
||
import aiohttp | ||
import msgpack | ||
import zmq | ||
from quart import Quart, make_response, request | ||
|
||
count = 0 | ||
prefill_instances: dict[str, str] = {} # http_address: zmq_address | ||
decode_instances: dict[str, str] = {} # http_address: zmq_address | ||
|
||
prefill_cv = threading.Condition() | ||
decode_cv = threading.Condition() | ||
|
||
|
||
def _listen_for_register(poller, router_socket): | ||
while True: | ||
socks = dict(poller.poll()) | ||
if router_socket in socks: | ||
remote_address, message = router_socket.recv_multipart() | ||
# data: {"type": "P", "http_address": "ip:port", | ||
# "zmq_address": "ip:port"} | ||
data = msgpack.loads(message) | ||
if data["type"] == "P": | ||
global prefill_instances | ||
global prefill_cv | ||
with prefill_cv: | ||
prefill_instances[data["http_address"]] = data["zmq_address"] | ||
elif data["type"] == "D": | ||
global decode_instances | ||
global decode_cv | ||
with decode_cv: | ||
decode_instances[data["http_address"]] = data["zmq_address"] | ||
else: | ||
print( | ||
"Unexpected, Received message from %s, data: %s", | ||
remote_address, | ||
data, | ||
) | ||
|
||
|
||
def start_service_discovery(hostname, port): | ||
if not hostname: | ||
hostname = socket.gethostname() | ||
if port == 0: | ||
raise ValueError("Port cannot be 0") | ||
|
||
context = zmq.Context() | ||
router_socket = context.socket(zmq.ROUTER) | ||
router_socket.bind(f"tcp://{hostname}:{port}") | ||
|
||
poller = zmq.Poller() | ||
poller.register(router_socket, zmq.POLLIN) | ||
|
||
_listener_thread = threading.Thread( | ||
target=_listen_for_register, args=[poller, router_socket], daemon=True | ||
) | ||
_listener_thread.start() | ||
return _listener_thread | ||
|
||
|
||
AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60) | ||
|
||
app = Quart(__name__) | ||
|
||
|
||
def random_uuid() -> str: | ||
return str(uuid.uuid4().hex) | ||
|
||
|
||
async def forward_request(url, data, request_id): | ||
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: | ||
headers = { | ||
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", | ||
"X-Request-Id": request_id, | ||
} | ||
async with session.post(url=url, json=data, headers=headers) as response: | ||
if response.status == 200: | ||
if True: | ||
async for chunk_bytes in response.content.iter_chunked(1024): | ||
yield chunk_bytes | ||
else: | ||
content = await response.read() | ||
yield content | ||
|
||
|
||
@app.route("/v1/completions", methods=["POST"]) | ||
async def handle_request(): | ||
try: | ||
original_request_data = await request.get_json() | ||
|
||
prefill_request = original_request_data.copy() | ||
# change max_tokens = 1 to let it only do prefill | ||
prefill_request["max_tokens"] = 1 | ||
|
||
global count | ||
global prefill_instances | ||
global prefill_cv | ||
with prefill_cv: | ||
prefill_list = list(prefill_instances.items()) | ||
prefill_addr, prefill_zmq_addr = prefill_list[count % len(prefill_list)] | ||
|
||
global decode_instances | ||
global decode_cv | ||
with decode_cv: | ||
decode_list = list(decode_instances.items()) | ||
decode_addr, decode_zmq_addr = decode_list[count % len(decode_list)] | ||
|
||
print( | ||
f"handle_request count: {count}, [HTTP:{prefill_addr}, " | ||
f"ZMQ:{prefill_zmq_addr}] 👉 [HTTP:{decode_addr}, " | ||
f"ZMQ:{decode_zmq_addr}]" | ||
) | ||
count += 1 | ||
|
||
request_id = ( | ||
f"___prefill_addr_{prefill_zmq_addr}___decode_addr_" | ||
f"{decode_zmq_addr}_{random_uuid()}" | ||
) | ||
|
||
# finish prefill | ||
async for _ in forward_request( | ||
f"http://{prefill_addr}/v1/completions", prefill_request, request_id | ||
): | ||
continue | ||
|
||
# return decode | ||
generator = forward_request( | ||
f"http://{decode_addr}/v1/completions", original_request_data, request_id | ||
) | ||
response = await make_response(generator) | ||
response.timeout = None | ||
|
||
return response | ||
|
||
except Exception as e: | ||
import sys | ||
import traceback | ||
|
||
exc_info = sys.exc_info() | ||
print("Error occurred in disagg prefill proxy server") | ||
print(e) | ||
print("".join(traceback.format_exception(*exc_info))) | ||
|
||
|
||
if __name__ == "__main__": | ||
t = start_service_discovery("0.0.0.0", 30001) | ||
app.run(host="0.0.0.0", port=10001) | ||
t.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.