Skip to content

[V1][P/D] An native implementation of xPyD based on P2P NCCL #18242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 173 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
173 commits
Select commit Hold shift + click to select a range
0a60364
runnable
Abatom Mar 31, 2025
016d004
format
Abatom Mar 31, 2025
448bed9
format
Abatom Mar 31, 2025
825fe06
format
Abatom Mar 31, 2025
477fe2b
format
Abatom Mar 31, 2025
dd6dcf9
pass
Abatom Mar 31, 2025
7eb1575
format
Abatom Mar 31, 2025
a0d37bb
format
Abatom Mar 31, 2025
da335ea
move some args to kv_connector_extra_config
Abatom Apr 1, 2025
178ca2f
format
Abatom Apr 1, 2025
f03ac47
remove some code comments
Abatom Apr 6, 2025
603a355
Replace pickle with msgpack
Abatom Apr 6, 2025
2acb321
fix bug
Abatom Apr 7, 2025
b957dd7
Out Of Memory
Abatom Apr 7, 2025
f6407b3
_send_sync
Abatom Apr 7, 2025
5c165d9
add p2p_nccl_connector.py based on V1
Abatom Apr 9, 2025
1e9fab6
add shape and size log for recv_tensor
Abatom Apr 9, 2025
33601e2
fix hang & oom
Abatom Apr 11, 2025
5a888fc
format
Abatom Apr 11, 2025
d3e7194
Merge branch 'main' into xpyd
Abatom Apr 11, 2025
bddc4e1
ping thread
Abatom Apr 12, 2025
e516a70
add code comments.
Abatom Apr 12, 2025
8dadfb4
GET
Abatom Apr 14, 2025
659ead7
format
Abatom Apr 14, 2025
fd596dc
send_queue
Abatom Apr 14, 2025
fe81aae
modify log
Abatom Apr 15, 2025
bb5d23f
Merge branch 'main' into xpyd
Abatom Apr 15, 2025
21818fe
fix bug for PUT_ASYNC
Abatom Apr 15, 2025
dcb637b
modify log
Abatom Apr 15, 2025
5cbb299
format
Abatom Apr 15, 2025
976f51b
format
Abatom Apr 15, 2025
a2470ac
fix bug
Abatom Apr 15, 2025
b0facb9
format
Abatom Apr 15, 2025
20f2e7a
fix bug
Abatom Apr 16, 2025
8fd3eca
Merge branch 'main' into xpyd
Abatom Apr 17, 2025
f1a5183
fix bug
Abatom Apr 17, 2025
1c74857
format
Abatom Apr 17, 2025
41b0ae6
rm popitem
Abatom Apr 17, 2025
ec364e5
rm disagg_prefill_xpyd.sh
Abatom Apr 17, 2025
06281db
Merge branch 'main' into xpyd
Abatom Apr 18, 2025
ed2fbb6
V1
Abatom Apr 18, 2025
616ce48
bugfix and format
Abatom Apr 18, 2025
49336a2
bugfix
Abatom Apr 18, 2025
3d8d7b6
bugfix
Abatom Apr 18, 2025
0b9a2ac
format
Abatom Apr 18, 2025
e3f858f
add rank and local_rank
Abatom Apr 19, 2025
6e088e6
Merge branch 'main' into xpyd
Abatom Apr 22, 2025
e8b8f36
bugfix
Abatom Apr 22, 2025
8623e3c
runnable for V1
Abatom Apr 22, 2025
d11388a
format
Abatom Apr 22, 2025
17e9905
rm valid_num_tokens
Abatom Apr 23, 2025
e13094b
wait_for_save
Abatom Apr 23, 2025
d96ecc3
inject_kv_into_layer
Abatom Apr 23, 2025
eaaf50c
get_num_new_matched_tokens
Abatom Apr 24, 2025
6a2af6c
make_meta
Abatom Apr 24, 2025
7d0f562
format
Abatom Apr 24, 2025
ca9724a
add send_stream and recv_stream
Abatom Apr 27, 2025
738c14c
Merge branch 'main' into xpyd
Abatom Apr 27, 2025
8d41359
Each NCCL connects to a stream.
Abatom Apr 28, 2025
1ad9579
bugfix for GET and revert Each NCCL connects to a stream.
Abatom Apr 28, 2025
13fa8b6
add mem pool
Abatom Apr 30, 2025
d715d6b
improve mempool
Abatom May 6, 2025
3259540
bugfix
Abatom May 7, 2025
f525001
torch.cuda.Event
Abatom May 7, 2025
1c98a49
Merge branch 'main' into xpyd
Abatom May 8, 2025
ac810f3
Merge branch 'xpyd' into xpyd-mempool
Abatom May 8, 2025
28ef7cc
load_stream.synchronize and store_stream.synchronize
Abatom May 10, 2025
fab1d33
stream.synchronize
Abatom May 10, 2025
2400d0b
build_connector_meta
Abatom May 11, 2025
6eab9df
bugfix
Abatom May 12, 2025
acfa2ac
bugfix
Abatom May 12, 2025
bb596f7
proxy add round robin
Abatom May 12, 2025
7b710f4
mem_pool_size
Abatom May 14, 2025
942e1d5
add tensor_mem_pool
Abatom May 14, 2025
49387ff
rm v0
Abatom May 16, 2025
ac944be
Merge branch 'main' into xpyd-v1
Abatom May 16, 2025
80e26b9
rename
Abatom May 16, 2025
a9c5674
format
Abatom May 16, 2025
3c67c9e
bugfix
Abatom May 16, 2025
b6b52d2
format
Abatom May 16, 2025
540ab4e
format
Abatom May 16, 2025
a0b23a6
format
Abatom May 16, 2025
92c2ab8
format
Abatom May 16, 2025
6ffde23
Merge branch 'main' into xpyd-v1
Abatom May 19, 2025
25d0557
Merge branch 'main' into xpyd-v1
Abatom May 20, 2025
ccba1d6
get_num_new_matched_tokens
Abatom May 20, 2025
2840981
bugfix for undefined symbol
Abatom May 21, 2025
db544da
store_tensor and load_tensor
Abatom May 21, 2025
bb3b617
add !
Abatom May 21, 2025
9665cd2
add tensor_store_load_mem.cu to CMakeLists.txt
Abatom May 21, 2025
c792a55
bugfix for CMakeLists.txt
Abatom May 21, 2025
d02ecab
bugfix for cmake
Abatom May 21, 2025
d33a404
#include <Python.h>
Abatom May 21, 2025
2c49965
fix make error
Abatom May 21, 2025
ed94529
fix make error
Abatom May 21, 2025
b9ac677
format
Abatom May 21, 2025
684d070
bugfix & format
Abatom May 21, 2025
152b140
float(self.config.kv_buffer_size)
Abatom May 21, 2025
1ba5228
add log
Abatom May 21, 2025
65e5f43
format
Abatom May 21, 2025
28aa693
change - to #
Abatom May 22, 2025
4661dc5
Merge branch 'main' into xpyd-v1
Abatom May 22, 2025
8512639
add get_finished & request_finished
Abatom May 22, 2025
b7d7375
format
Abatom May 22, 2025
ad815ac
bugfix for chunked prefill
Abatom May 23, 2025
1db3729
format
Abatom May 23, 2025
1caf937
format & debug log
Abatom May 23, 2025
0faff05
Merge branch 'main' into xpyd-v1
Abatom May 24, 2025
b1937e9
bugfix for
Abatom May 24, 2025
fa2f130
1/N preemption
Abatom May 24, 2025
4f79476
Clear the buffer upon request completion(1/N)
Abatom May 24, 2025
e41d815
Clear the buffer upon request completion(2/N)
Abatom May 24, 2025
eefd267
get_finished
Abatom May 26, 2025
1a0bcba
bugfix for get_finished
Abatom May 26, 2025
f5876c8
bugfix for get_finished and format
Abatom May 26, 2025
1d37166
format
Abatom May 26, 2025
1b29352
format
Abatom May 26, 2025
43ed2ad
Support release KV Cache after sending is completed
Abatom May 26, 2025
38ae6c3
Clear the buffer upon request completion(3/N)
Abatom May 26, 2025
bcd3dd9
Fix the issue of inaccurate results caused by preemption
Abatom May 28, 2025
4b15e27
format
Abatom May 28, 2025
bcc608f
format
Abatom May 28, 2025
5c96490
format
Abatom May 28, 2025
af150dd
format
Abatom May 28, 2025
a7ccdca
bugfix for KVCacheManager.get_block_ids
Abatom May 28, 2025
4707612
get_block_ids
Abatom May 29, 2025
066b347
use copy_
Abatom May 29, 2025
e384e7d
format
Abatom May 29, 2025
5cc9686
remove tensor_store_load_mem.cu
Abatom May 29, 2025
6825ed7
remove tensor_store_load_mem.cu
Abatom May 29, 2025
a805c42
format
Abatom May 29, 2025
afa7552
Misc
Abatom May 29, 2025
47ee59a
Misc
Abatom May 29, 2025
135e906
format
Abatom May 29, 2025
dcc36fb
Merge branch 'main' into xpyd-v1
Abatom May 30, 2025
8af556a
get_finished
Abatom May 31, 2025
5c1c552
get_finished
Abatom May 31, 2025
18d40b1
use get_world_group().local_rank
Abatom Jun 1, 2025
3003a10
modify log
Abatom Jun 1, 2025
c71b97f
bugfix
Abatom Jun 1, 2025
5e20f3e
Merge pull request #8 from Abatom/xpyd-v1-rank
Abatom Jun 1, 2025
f6460c3
SharedStorageConnector.__init__
Abatom Jun 1, 2025
bc5872b
get_forward_context()
Abatom Jun 2, 2025
4e36203
forward_context
Abatom Jun 2, 2025
631445a
Merge branch 'main' into xpyd-v1
Abatom Jun 3, 2025
92cffaf
Merge branch 'vllm-project:main' into xpyd-v1
Abatom Jun 3, 2025
bbf257c
clean code
Abatom Jun 4, 2025
6b5547d
format
Abatom Jun 4, 2025
37c5374
set_p2p_nccl_context
Abatom Jun 4, 2025
1a2ffe5
mem_pool_size_gb(32)
Abatom Jun 7, 2025
b060014
Merge branch 'main' into xpyd-v1
Abatom Jun 7, 2025
17a83f9
nccl_num_chennels=8
Abatom Jun 13, 2025
b4416a4
MemoryError->ValueError
Abatom Jun 13, 2025
a3e7337
log level
Abatom Jun 13, 2025
ad7f14a
TensorMemoryPool
Abatom Jun 13, 2025
3c30c02
format
Abatom Jun 13, 2025
babdaa2
format
Abatom Jun 13, 2025
fea91ab
add the docstring for TensorMemoryPool
Abatom Jun 13, 2025
a33e149
format
Abatom Jun 13, 2025
c8558d3
num_chennels->num_channels
Abatom Jun 13, 2025
58eeac7
format
Abatom Jun 13, 2025
d063d89
Only symmetric TP is supported
Abatom Jun 13, 2025
5118c06
format
Abatom Jun 13, 2025
fd38521
add doc
Abatom Jun 13, 2025
4fd68a9
doc
Abatom Jun 13, 2025
95bd8c8
clean up lots of logs
Abatom Jun 16, 2025
b96369f
log level
Abatom Jun 16, 2025
938dbdb
update md
Abatom Jun 16, 2025
2dea3d0
doc
Abatom Jun 17, 2025
c7210ea
Add Detailed Design
Abatom Jun 18, 2025
cad61b8
Add Test data
Abatom Jun 18, 2025
4fd955a
format
Abatom Jun 18, 2025
9d52eea
format
Abatom Jun 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
337 changes: 337 additions & 0 deletions docs/design/v1/p2p_nccl_connector.md

Large diffs are not rendered by default.

154 changes: 154 additions & 0 deletions examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# SPDX-License-Identifier: Apache-2.0

import os
import socket
import threading
import uuid

import aiohttp
import msgpack
import zmq
from quart import Quart, make_response, request

count = 0
prefill_instances: dict[str, str] = {} # http_address: zmq_address
decode_instances: dict[str, str] = {} # http_address: zmq_address

prefill_cv = threading.Condition()
decode_cv = threading.Condition()


def _listen_for_register(poller, router_socket):
while True:
socks = dict(poller.poll())
if router_socket in socks:
remote_address, message = router_socket.recv_multipart()
# data: {"type": "P", "http_address": "ip:port",
# "zmq_address": "ip:port"}
data = msgpack.loads(message)
if data["type"] == "P":
global prefill_instances
global prefill_cv
with prefill_cv:
prefill_instances[data["http_address"]] = data["zmq_address"]
elif data["type"] == "D":
global decode_instances
global decode_cv
with decode_cv:
decode_instances[data["http_address"]] = data["zmq_address"]
else:
print(
"Unexpected, Received message from %s, data: %s",
remote_address,
data,
)


def start_service_discovery(hostname, port):
if not hostname:
hostname = socket.gethostname()
if port == 0:
raise ValueError("Port cannot be 0")

context = zmq.Context()
router_socket = context.socket(zmq.ROUTER)
router_socket.bind(f"tcp://{hostname}:{port}")

poller = zmq.Poller()
poller.register(router_socket, zmq.POLLIN)

_listener_thread = threading.Thread(
target=_listen_for_register, args=[poller, router_socket], daemon=True
)
_listener_thread.start()
return _listener_thread


AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)

app = Quart(__name__)


def random_uuid() -> str:
return str(uuid.uuid4().hex)


async def forward_request(url, data, request_id):
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
headers = {
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
"X-Request-Id": request_id,
}
async with session.post(url=url, json=data, headers=headers) as response:
if response.status == 200:
if True:
async for chunk_bytes in response.content.iter_chunked(1024):
yield chunk_bytes
else:
content = await response.read()
yield content


@app.route("/v1/completions", methods=["POST"])
async def handle_request():
try:
original_request_data = await request.get_json()

prefill_request = original_request_data.copy()
# change max_tokens = 1 to let it only do prefill
prefill_request["max_tokens"] = 1

global count
global prefill_instances
global prefill_cv
with prefill_cv:
prefill_list = list(prefill_instances.items())
prefill_addr, prefill_zmq_addr = prefill_list[count % len(prefill_list)]

global decode_instances
global decode_cv
with decode_cv:
decode_list = list(decode_instances.items())
decode_addr, decode_zmq_addr = decode_list[count % len(decode_list)]

print(
f"handle_request count: {count}, [HTTP:{prefill_addr}, "
f"ZMQ:{prefill_zmq_addr}] 👉 [HTTP:{decode_addr}, "
f"ZMQ:{decode_zmq_addr}]"
)
count += 1

request_id = (
f"___prefill_addr_{prefill_zmq_addr}___decode_addr_"
f"{decode_zmq_addr}_{random_uuid()}"
)

# finish prefill
async for _ in forward_request(
f"http://{prefill_addr}/v1/completions", prefill_request, request_id
):
continue

# return decode
generator = forward_request(
f"http://{decode_addr}/v1/completions", original_request_data, request_id
)
response = await make_response(generator)
response.timeout = None

return response

except Exception as e:
import sys
import traceback

exc_info = sys.exc_info()
print("Error occurred in disagg prefill proxy server")
print(e)
print("".join(traceback.format_exception(*exc_info)))


if __name__ == "__main__":
t = start_service_discovery("0.0.0.0", 30001)
app.run(host="0.0.0.0", port=10001)
t.join()
8 changes: 8 additions & 0 deletions vllm/distributed/device_communicators/pynccl_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ def ncclGetUniqueId(self) -> ncclUniqueId:
ctypes.byref(unique_id)))
return unique_id

def unique_id_from_bytes(self, data: bytes) -> ncclUniqueId:
if len(data) != 128:
raise ValueError(
f"Expected 128 bytes for ncclUniqueId, got {len(data)} bytes")
unique_id = ncclUniqueId()
ctypes.memmove(ctypes.addressof(unique_id.internal), data, 128)
return unique_id

def ncclCommInitRank(self, world_size: int, unique_id: ncclUniqueId,
rank: int) -> ncclComm_t:
comm = ncclComm_t()
Expand Down
5 changes: 5 additions & 0 deletions vllm/distributed/kv_transfer/kv_connector/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def create_connector_v1(
"vllm.distributed.kv_transfer.kv_connector.v1.shared_storage_connector",
"SharedStorageConnector")

KVConnectorFactory.register_connector(
"P2pNcclConnector",
"vllm.distributed.kv_transfer.kv_connector.v1.p2p.p2p_nccl_connector",
"P2pNcclConnector")

KVConnectorFactory.register_connector(
"LMCacheConnectorV1",
"vllm.distributed.kv_transfer.kv_connector.v1.lmcache_connector",
Expand Down
Empty file.
Loading