-
-
Notifications
You must be signed in to change notification settings - Fork 5.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ Frontend ] Multiprocessing for OpenAI Server with zeromq
#6883
Merged
simon-mo
merged 84 commits into
vllm-project:main
from
neuralmagic:isolate-oai-server-process
Aug 3, 2024
Merged
Changes from 48 commits
Commits
Show all changes
84 commits
Select commit
Hold shift + click to select a range
bed649a
:alembic: add backend proto file
joerunde 7de9d49
:recycle: move proto to grpc/pb
joerunde 9394a62
:sparkles: add proto compilation
joerunde dd8bf96
updated
robertgshaw2-redhat 5c7fbff
kinda working
robertgshaw2-redhat 952e8ef
:construction: more wip
joerunde e8eac95
fixed
robertgshaw2-redhat 938a843
:bug: fixup race condition
joerunde 2b8d7cd
:bug: remove timeout
joerunde ea02d39
format
robertgshaw2-redhat 4a2dc46
streaming
robertgshaw2-redhat 30f2bc9
removed breaks
robertgshaw2-redhat c718b68
pushing current state
robertgshaw2-redhat b3d25c6
:alembic: try unix sockets
joerunde 2765b17
:zap: no background loop
joerunde b219778
spurious change
robertgshaw2-redhat 932ea23
remove spurious change
robertgshaw2-redhat f029114
spurious changes
robertgshaw2-redhat 6854758
spurioous change
robertgshaw2-redhat 3b5ff66
:bug: whoops
joerunde 79247c3
:memo: log stuff
joerunde a39ebc0
stash
robertgshaw2-redhat ef257f1
pushing up
robertgshaw2-redhat a6c9bc5
stash
robertgshaw2-redhat d7490bc
actually working
robertgshaw2-redhat f68fd60
cleanup
robertgshaw2-redhat 38b5b9c
more cleanup
robertgshaw2-redhat bc54311
cleanup
robertgshaw2-redhat 3cccebb
stash
robertgshaw2-redhat 4b78e29
more cleanup
robertgshaw2-redhat 345bfdd
setup
robertgshaw2-redhat cfbb001
cleanup
robertgshaw2-redhat d811b42
format
robertgshaw2-redhat 852534e
cleaning up
robertgshaw2-redhat e42be96
zlib
robertgshaw2-redhat 5202a59
Revert "zlib"
robertgshaw2-redhat 71b1bf9
turn on chunked prefill
robertgshaw2-redhat a499079
move RPC code into oai server
robertgshaw2-redhat 88a1d08
format
robertgshaw2-redhat 13ce2f1
format
robertgshaw2-redhat bb8ac06
trying to flow it through
robertgshaw2-redhat 6ebdb3d
cleaning
robertgshaw2-redhat 24c8100
cleaning
robertgshaw2-redhat e707049
cleaning
robertgshaw2-redhat baaf6bc
add stubs
robertgshaw2-redhat 9d19d92
format
robertgshaw2-redhat f1be4b8
working with single launch...
robertgshaw2-redhat 8e417ad
working end to end - with some hacks
robertgshaw2-redhat 4c16c5e
:goal_net: handle shutdown and request errors
joerunde 6ddd4a7
:art: fmt and clean up shutdown handler
joerunde 6d7da74
:bug: fixup type hint for queue
joerunde 97ea04d
:sparkles: update chat endpoint
joerunde 6d753a4
:bug: fixup zmq constant types
joerunde 38e308e
:sparkles: hook up de/tokenize
joerunde ec19a7b
:recycle: add VLLMBackend protocol
joerunde 453939b
Frontend mp flag (#384)
joerunde 1f33286
Features / Cleanup for MP Frontend (#387)
robertgshaw2-redhat 5362952
Use random port for backend (#390)
joerunde 7214fb8
Await socket operations + some other minor cleanup (#391)
njhill 98a7dab
:sparkles: health check round 2 (#392)
joerunde f5f0b45
Add tokenizer (#394)
robertgshaw2-redhat 0b351c0
Socket context (#393)
joerunde 79fcc44
Logit bias (#395)
robertgshaw2-redhat 9da8c4a
Merge remote-tracking branch 'upstream/main' into isolate-oai-server-…
joerunde 4c65f74
:bug: messed up the revert in the merge commit :(
joerunde 9bc97f1
fix (#396)
robertgshaw2-redhat 68d8612
Merge remote-tracking branch 'upstream/main' into isolate-oai-server-…
joerunde 4337fe7
format
robertgshaw2-redhat 779d9bd
stash
robertgshaw2-redhat a6044a3
Fix failed tests (#398)
robertgshaw2-redhat 100189f
Merge branch 'main' into isolate-oai-server-process
robertgshaw2-redhat 0fc8545
fixed merge conflicts
robertgshaw2-redhat 6383091
updated
robertgshaw2-redhat a09f57f
cleaning
robertgshaw2-redhat 1bdbfcb
:white_check_mark: add test for multiprocessing flag (#399)
joerunde f3c0f1c
:sparkles: pipe tracing flag (#400)
joerunde 9c415ad
integration tests for old backend
robertgshaw2-redhat 62036ad
rename
robertgshaw2-redhat a177d87
cleaning
robertgshaw2-redhat 9ca3b93
ordering
robertgshaw2-redhat f8b5fb1
fix embedding model feedback
robertgshaw2-redhat fca5a71
Update vllm/entrypoints/openai/rpc/server.py
robertgshaw2-redhat 5f07f86
format
robertgshaw2-redhat bd0fd76
Merge branch 'main' into isolate-oai-server-process
robertgshaw2-redhat File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from dataclasses import dataclass | ||
from typing import Optional, Mapping | ||
from enum import Enum | ||
|
||
from vllm.inputs import PromptInputs | ||
from vllm.lora.request import LoRARequest | ||
from vllm.prompt_adapter.request import PromptAdapterRequest | ||
from vllm.sampling_params import SamplingParams | ||
|
||
VLLM_GENERATE_RPC_PATH = "tcp://localhost:5570" | ||
VLLM_GET_DATA_RPC_PATH = "tcp://localhost:5571" | ||
VLLM_IS_READY_RPC_PATH = "tcp://localhost:5572" | ||
|
||
|
||
@dataclass | ||
class GenerateRequest: | ||
inputs: PromptInputs | ||
sampling_params: SamplingParams | ||
request_id: str | ||
lora_request: Optional[LoRARequest] = None | ||
trace_headers: Optional[Mapping[str, str]] = None | ||
prompt_adapter_request: Optional[PromptAdapterRequest] = None | ||
|
||
|
||
class GetDataRequest(Enum): | ||
MODEL_CONFIG = 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
from typing import AsyncIterator, Optional, Mapping | ||
|
||
from vllm.config import ModelConfig, DecodingConfig | ||
from vllm.inputs import PromptInputs | ||
from vllm.lora.request import LoRARequest | ||
from vllm.outputs import RequestOutput | ||
from vllm.prompt_adapter.request import PromptAdapterRequest | ||
from vllm.sampling_params import SamplingParams | ||
from vllm.entrypoints.openai.rpc import (VLLM_GENERATE_RPC_PATH, | ||
VLLM_GET_DATA_RPC_PATH, | ||
VLLM_IS_READY_RPC_PATH, | ||
GenerateRequest, GetDataRequest) | ||
|
||
import zmq | ||
import zmq.asyncio | ||
import pickle | ||
|
||
|
||
class RPCClient: | ||
|
||
# TODO: check if opening all these sockets is an antipattern? | ||
def __init__(self, tokenizer): | ||
self.context = zmq.asyncio.Context() | ||
|
||
# TODO: do the tokenizer properly. | ||
self.tokenizer = tokenizer | ||
self.decoding_config = DecodingConfig() | ||
|
||
# Socket to check if the RPC server is ready. | ||
self.is_ready_socket = self.context.socket(zmq.REP) | ||
self.is_ready_socket.connect(VLLM_IS_READY_RPC_PATH) | ||
|
||
# Socket to query data (e.g. get_model_config) | ||
self.get_data_socket = self.context.socket(zmq.REQ) | ||
self.get_data_socket.connect(VLLM_GET_DATA_RPC_PATH) | ||
|
||
async def wait_for_server(self): | ||
await self.is_ready_socket.recv() | ||
|
||
async def get_model_config(self) -> ModelConfig: | ||
self.get_data_socket.send(pickle.dumps(GetDataRequest.MODEL_CONFIG)) | ||
model_config = await self.get_data_socket.recv() | ||
return pickle.loads(model_config) | ||
|
||
async def get_tokenizer(self, lora_request: LoRARequest): | ||
# TODO: handle this via get data? - or avoid doing via RPC | ||
return self.tokenizer | ||
|
||
async def get_decoding_config(self): | ||
# TODO: handle this via get data? - or avoid doing via RPC | ||
return self.decoding_config | ||
|
||
async def abort(self, request_id: str): | ||
# TODO: actually handle this with a new socket. | ||
pass | ||
|
||
async def is_tracing_enabled(self): | ||
return False | ||
|
||
async def generate( | ||
self, | ||
inputs: PromptInputs, | ||
sampling_params: SamplingParams, | ||
request_id: str, | ||
lora_request: Optional[LoRARequest] = None, | ||
trace_headers: Optional[Mapping[str, str]] = None, | ||
prompt_adapter_request: Optional[PromptAdapterRequest] = None | ||
) -> AsyncIterator[RequestOutput]: | ||
|
||
# Connect to RPC socket for Request-Reply pattern, | ||
# Note that we use DEALER to enable asynchronous communication | ||
# to enable streaming. | ||
socket = self.context.socket(zmq.DEALER) | ||
socket.connect(VLLM_GENERATE_RPC_PATH) | ||
|
||
# Send GenerateRequest to the RPC Server. | ||
await socket.send_multipart([ | ||
pickle.dumps( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As @robertgshaw2-neuralmagic suggested, let's use msgspec? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's separate the messaging protocol optimizations to a separate PR |
||
GenerateRequest(inputs=inputs, | ||
sampling_params=sampling_params, | ||
request_id=request_id, | ||
lora_request=lora_request, | ||
trace_headers=trace_headers, | ||
prompt_adapter_request=prompt_adapter_request), | ||
pickle.HIGHEST_PROTOCOL) | ||
]) | ||
|
||
# Stream back the results from the RPC Server. | ||
while True: | ||
message = await socket.recv() | ||
request_output = pickle.loads(message) | ||
|
||
if request_output.finished: | ||
break | ||
yield request_output | ||
|
||
socket.close() | ||
yield request_output |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least we should be choosing randomly available ports above the "user space" of the 1000s port range. See https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.bind_to_random_port
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgoin yeah that'd be better. Any idea how we'd notify the clients what port to connect to in that case?
The server that would be calling
.bind_to_random_port()
is in a different process than the openai server that needs to connect clients to itThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the the openai server can listen first, pass the ports to the server process, and then the server just connects to it?