Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Bugfix][Perf] Introduce MQLLMEngine to avoid asyncio OH #8157

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
130 commits
Select commit Hold shift + click to select a range
a7a6e43
[Benchmark] Add async throughput benchmark
njhill Aug 28, 2024
ce7d159
wip
njhill Aug 29, 2024
569cd43
Merge remote-tracking branch 'njhill/async-llm-eng-bench' into reduce…
robertgshaw2-redhat Aug 29, 2024
d99ce6f
stash
robertgshaw2-redhat Aug 31, 2024
8d6b2e9
remove proxy
robertgshaw2-redhat Sep 2, 2024
14f3637
stash
robertgshaw2-redhat Sep 2, 2024
3b8311b
added mp_llm_engine
robertgshaw2-redhat Sep 2, 2024
5e2eb74
fixed
robertgshaw2-redhat Sep 2, 2024
aa62f2e
format
robertgshaw2-redhat Sep 2, 2024
863081b
cleanup
robertgshaw2-redhat Sep 2, 2024
965b97a
revert asyncllmengine
robertgshaw2-redhat Sep 2, 2024
8fd72f6
fix nit
robertgshaw2-redhat Sep 2, 2024
ddeb7c6
format
robertgshaw2-redhat Sep 2, 2024
6539e10
Merge branch 'main' into reduce-asyncio-oh
robertgshaw2-redhat Sep 2, 2024
4b111e4
clean
robertgshaw2-redhat Sep 2, 2024
a5ffd2c
fix
robertgshaw2-redhat Sep 2, 2024
1395872
stash
robertgshaw2-redhat Sep 2, 2024
938cf85
move files
robertgshaw2-redhat Sep 2, 2024
72d1d42
cleanup code
robertgshaw2-redhat Sep 3, 2024
fcdcfc9
refactor, cleanup
robertgshaw2-redhat Sep 3, 2024
659169e
updated
robertgshaw2-redhat Sep 3, 2024
9886f3d
make health check work
robertgshaw2-redhat Sep 3, 2024
5b2f057
format
robertgshaw2-redhat Sep 3, 2024
ae4564c
awk -> ack
robertgshaw2-redhat Sep 3, 2024
f9ccecc
add better shutdown
robertgshaw2-redhat Sep 3, 2024
89b730b
cleanup comment
robertgshaw2-redhat Sep 3, 2024
f3dc82b
more awk --> ack
robertgshaw2-redhat Sep 3, 2024
ac97a9e
use constant
robertgshaw2-redhat Sep 3, 2024
becd7ab
format
robertgshaw2-redhat Sep 3, 2024
b7f49ed
remove set to None
robertgshaw2-redhat Sep 3, 2024
58ae3b0
Merge remote-tracking branch 'origin/main' into reduce-asyncio-oh
njhill Sep 4, 2024
d0f9641
Remove redundant pass
njhill Sep 4, 2024
aa64042
Merge branch 'main' into reduce-asyncio-oh
robertgshaw2-redhat Sep 4, 2024
5c6e5ef
review comments
alexm-redhat Sep 4, 2024
25174a5
format
alexm-redhat Sep 4, 2024
db55c1a
add async socket reads and socket writes
alexm-redhat Sep 4, 2024
f97e1f2
Some error handling
njhill Sep 4, 2024
dd96d3e
remove async benchmark
robertgshaw2-redhat Sep 7, 2024
14d4afe
stash
robertgshaw2-redhat Sep 7, 2024
bc386ea
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 7, 2024
c0d0d60
adding error handling
robertgshaw2-redhat Sep 7, 2024
b7c1fcc
error handling
robertgshaw2-redhat Sep 7, 2024
a661b76
added
robertgshaw2-redhat Sep 7, 2024
5d00f3a
formatting in place
robertgshaw2-redhat Sep 7, 2024
5598494
added error handling
robertgshaw2-redhat Sep 8, 2024
98aaa7d
change name
robertgshaw2-redhat Sep 8, 2024
ba5ef38
change name
robertgshaw2-redhat Sep 8, 2024
18b5a94
added dead_error to asyncengine
robertgshaw2-redhat Sep 8, 2024
b048961
moved tests under openai
robertgshaw2-redhat Sep 8, 2024
6b2e18b
updated tests
robertgshaw2-redhat Sep 8, 2024
7a7ff5b
revert executor change
robertgshaw2-redhat Sep 8, 2024
b7e1fe9
revert
robertgshaw2-redhat Sep 8, 2024
48068d5
executor class
robertgshaw2-redhat Sep 8, 2024
e3daa28
cleanup format
robertgshaw2-redhat Sep 8, 2024
7880b75
format
robertgshaw2-redhat Sep 8, 2024
29fe3c8
shorten
robertgshaw2-redhat Sep 8, 2024
a720947
Revert change
robertgshaw2-redhat Sep 8, 2024
5b8cee6
enable shutdown for tp>1
robertgshaw2-redhat Sep 8, 2024
97a241d
format
robertgshaw2-redhat Sep 8, 2024
6d0570e
added error handling
robertgshaw2-redhat Sep 8, 2024
eb26791
format
robertgshaw2-redhat Sep 8, 2024
e256050
try out hwm
robertgshaw2-redhat Sep 9, 2024
59c5aca
Add stop_remote_worker_execution_loop for TP case
njhill Sep 9, 2024
62f654a
Revert unnecessary stop_remote_worker_execution_loop
njhill Sep 10, 2024
75c6157
fixed magicmock errored
robertgshaw2-redhat Sep 10, 2024
6f1cced
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 10, 2024
370c104
fall back to asyncllmengine if pp
robertgshaw2-redhat Sep 10, 2024
0cf9551
formatting
robertgshaw2-redhat Sep 10, 2024
72f72fd
stash
robertgshaw2-redhat Sep 10, 2024
ded4540
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 10, 2024
364ed7f
remove DO_LOG_STATS RPC call
robertgshaw2-redhat Sep 10, 2024
f7fdf69
cleanup health check
robertgshaw2-redhat Sep 10, 2024
7e61cdb
Use pickle for requests too
njhill Sep 10, 2024
3e84c8c
Remove hwm
robertgshaw2-redhat Sep 10, 2024
2559813
Simplify configs setup
njhill Sep 10, 2024
d0a0f8b
stash
robertgshaw2-redhat Sep 10, 2024
70e4916
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 10, 2024
021fed3
added tests
robertgshaw2-redhat Sep 10, 2024
fd6ee43
added failed health check
robertgshaw2-redhat Sep 11, 2024
ccb43a3
rename
robertgshaw2-redhat Sep 11, 2024
1aa0823
added failed abort test
robertgshaw2-redhat Sep 11, 2024
fe22fe2
formatting
robertgshaw2-redhat Sep 11, 2024
3ce8702
Some more startup RPC simplification
njhill Sep 11, 2024
1f3fc24
fix yapf conflict
njhill Sep 11, 2024
ead62dd
fix entrypoints tests
alexm-redhat Sep 11, 2024
672fb81
stash
robertgshaw2-redhat Sep 11, 2024
86312e4
fix Intel/TPU tests
alexm-redhat Sep 11, 2024
c4f6898
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 11, 2024
678e8e5
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 11, 2024
78b9e21
fix
robertgshaw2-redhat Sep 11, 2024
66c6961
formatting
robertgshaw2-redhat Sep 11, 2024
6e1e2bb
cleanup
robertgshaw2-redhat Sep 11, 2024
610b349
cleanup
robertgshaw2-redhat Sep 11, 2024
28bb8a4
format
robertgshaw2-redhat Sep 11, 2024
b266249
fix poller
robertgshaw2-redhat Sep 11, 2024
f8036a5
add graceful shutdown on abort after client closed
robertgshaw2-redhat Sep 11, 2024
a649f75
cleanup formatting
robertgshaw2-redhat Sep 11, 2024
5b3535d
added test abort
robertgshaw2-redhat Sep 11, 2024
7097e05
fix up tests
robertgshaw2-redhat Sep 11, 2024
ad3d0f8
added abort tests
robertgshaw2-redhat Sep 12, 2024
6e9c6c9
added another accurayc test
robertgshaw2-redhat Sep 12, 2024
fb8e2f9
add multistep test for accuracy of mq llm engine
robertgshaw2-redhat Sep 12, 2024
75523b2
added test genertion
robertgshaw2-redhat Sep 12, 2024
5546d2e
fixed accuracy test launch
robertgshaw2-redhat Sep 12, 2024
6403f49
added load test
robertgshaw2-redhat Sep 12, 2024
bc68b51
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 12, 2024
3bb5e52
remove file
robertgshaw2-redhat Sep 12, 2024
2ac814f
format
robertgshaw2-redhat Sep 12, 2024
179a667
added load test
robertgshaw2-redhat Sep 12, 2024
97d6c09
format
robertgshaw2-redhat Sep 12, 2024
78badc1
added load test
robertgshaw2-redhat Sep 12, 2024
a499733
format
alexm-redhat Sep 12, 2024
6a5d8d8
stash
robertgshaw2-redhat Sep 12, 2024
dfab5eb
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 12, 2024
96f84fe
format
robertgshaw2-redhat Sep 12, 2024
ae14670
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 14, 2024
117c024
format
robertgshaw2-redhat Sep 14, 2024
c059713
remove debug print
robertgshaw2-redhat Sep 14, 2024
1af3297
removed stray
robertgshaw2-redhat Sep 14, 2024
97ae38d
updated
robertgshaw2-redhat Sep 14, 2024
d0fab11
switch model to avoid OOM in TPU test
robertgshaw2-redhat Sep 14, 2024
bb4d839
Merge remote-tracking branch 'origin/main' into reduce-asyncio-oh-alex
njhill Sep 16, 2024
1967f6a
Adjust timeouts
njhill Sep 16, 2024
a911323
stahs
robertgshaw2-redhat Sep 17, 2024
95ff4f3
make timeout 10000 ms
robertgshaw2-redhat Sep 17, 2024
302868e
format
robertgshaw2-redhat Sep 17, 2024
add68ee
Update examples/openai_chat_completion_client.py
robertgshaw2-redhat Sep 17, 2024
242b952
adjust RPC timeout on TPU
robertgshaw2-redhat Sep 17, 2024
3dafa26
add longer delay for check ehalth
robertgshaw2-redhat Sep 17, 2024
836a9d2
Update client.py
robertgshaw2-redhat Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
formatting
  • Loading branch information
robertgshaw2-redhat committed Sep 11, 2024
commit 66c696157b5400a50f2b00510ead6a254b9900f3
31 changes: 15 additions & 16 deletions tests/mq_llm_engine/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.multiprocessing import MQEngineDeadError
from vllm.engine.multiprocessing.engine import MQLLMEngine
from vllm.lora.request import LoRARequest
from vllm.entrypoints.openai.api_server import build_async_engine_client
from vllm.entrypoints.openai.cli_args import make_arg_parser
from vllm.lora.request import LoRARequest
from vllm.usage.usage_lib import UsageContext
from vllm.utils import FlexibleArgumentParser

Expand Down Expand Up @@ -79,7 +79,6 @@ async def test_evil_forward(tmp_socket):
assert client.errored, "Client should be dead."
assert isinstance(e, MQEngineDeadError), (
"Engine should be dead and raise ENGINE_DEAD_ERROR")


await asyncio.sleep(2.0)
try:
Expand Down Expand Up @@ -203,40 +202,40 @@ async def bad_abort_after_2s():

@pytest.mark.asyncio
async def test_bad_request(tmp_socket):
with RemoteMQLLMEngine(
engine_args=ENGINE_ARGS,
ipc_path=tmp_socket) as engine:
with RemoteMQLLMEngine(engine_args=ENGINE_ARGS,
ipc_path=tmp_socket) as engine:

client = await engine.make_client()

# This should fail, but not crash the server.
try:
print("calling first generate")
async for _ in client.generate(
inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id="abcd-1",
lora_request=LoRARequest("invalid-lora", 1, "invalid-path")):
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id="abcd-1",
lora_request=LoRARequest(
"invalid-lora", 1,
"invalid-path")):
pass
except Exception as e:
print("got exception")
assert isinstance(e, ValueError), (
"Expected ValueError when a LoRARequest in llm_engine")

# This request should be okay.
async for _ in client.generate(
inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id="abcd-2"):
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id="abcd-2"):
pass

# Confirm server is still running.
await asyncio.sleep(10.)
await client.check_health()

# Shutdown.
client.close()


@pytest.mark.asyncio
async def test_mp_crash_detection():

Expand Down
9 changes: 6 additions & 3 deletions tests/mq_llm_engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from typing import Callable

from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.multiprocessing.engine import MQLLMEngine
from vllm.engine.multiprocessing.client import MQLLMEngineClient
from vllm.engine.multiprocessing.engine import MQLLMEngine
from vllm.usage.usage_lib import UsageContext


Expand All @@ -17,10 +17,13 @@ def run_normal(engine_args: AsyncEngineArgs, ipc_path: str):
# Run engine.
engine.start()


class RemoteMQLLMEngine:

def __init__(self, engine_args: AsyncEngineArgs,
ipc_path: str, run_fn: Callable = run_normal) -> None:
def __init__(self,
engine_args: AsyncEngineArgs,
ipc_path: str,
run_fn: Callable = run_normal) -> None:

self.engine_args = engine_args
self.ipc_path = ipc_path
Expand Down
5 changes: 3 additions & 2 deletions vllm/engine/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class RPCStartupResponse:

REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCError]

def ENGINE_DEAD_ERROR(original_error: str) -> MQEngineDeadError:

def ENGINE_DEAD_ERROR(error: BaseException) -> MQEngineDeadError:
return MQEngineDeadError(
"Engine loop is not running. Inspect the stacktrace to "
f"find the original error: {original_error}.")
f"find the original error {repr(error)}.")
12 changes: 5 additions & 7 deletions vllm/engine/multiprocessing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class MQLLMEngineClient:
def __init__(self, ipc_path: str, engine_config: EngineConfig):
self.context = zmq.asyncio.Context()
self._errored_with: Optional[BaseException] = None
self.dead_error = ENGINE_DEAD_ERROR

# Get the configs.
self.model_config = engine_config.model_config
Expand Down Expand Up @@ -179,8 +178,8 @@ async def run_output_handler_loop(self):

# If errored, alert all running requests.
if self.errored:
for queue in tuple(self.output_queues.values()):
queue.put_nowait(ENGINE_DEAD_ERROR)
for queue_j in tuple(self.output_queues.values()):
queue_j.put_nowait(ENGINE_DEAD_ERROR)
return

message: Frame = await self.output_socket.recv(copy=False)
Expand Down Expand Up @@ -241,8 +240,7 @@ async def setup(self):

# Start health_loop.
self.health_loop = asyncio.create_task(
self.run_check_health_loop(
timeout=VLLM_RPC_TIMEOUT))
self.run_check_health_loop(timeout=VLLM_RPC_TIMEOUT))

# Notify MQLLMEngine client is ready to start sending requests.
await self._notify_ready(socket)
Expand Down Expand Up @@ -399,8 +397,8 @@ async def generate(
"""Send an RPCGenerateRequest to the RPCServer and stream responses."""

# If already dead, error out.
if self.errored:
raise ENGINE_DEAD_ERROR
if self._errored_with is not None:
raise ENGINE_DEAD_ERROR(self._errored_with)

# 1) Create output queue for this requests.
queue: asyncio.Queue[Union[RequestOutput,
Expand Down
13 changes: 5 additions & 8 deletions vllm/engine/multiprocessing/engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
from contextlib import contextmanager
from typing import Any, Optional, Iterator, List, Union
from typing import Any, Iterator, List, Optional, Union

import cloudpickle
import zmq
Expand Down Expand Up @@ -241,7 +241,7 @@ def _handle_generate_request(self, request: RPCGenerateRequest):
"""Handle RPCGenerateRequest by adding it to the LLMEngine."""
request_id = request.request_id

if self._is_errored():
if self._errored_with is not None:
rpc_err = RPCError(request_id=request_id,
is_engine_errored=True,
exception=ENGINE_DEAD_ERROR(self._errored_with))
Expand All @@ -263,8 +263,9 @@ def _handle_generate_request(self, request: RPCGenerateRequest):
# We do not set self._errored = True here, since the error
# is due to an issue adding this request to the engine,
# rather than an issue with the engine itself.
is_errored = self._errored_with is not None
rpc_err = RPCError(request_id=request_id,
is_engine_errored=self._errored,
is_engine_errored=is_errored,
exception=e)
self._send_outputs(rpc_err)

Expand All @@ -277,7 +278,7 @@ def _handle_abort_request(self, request: RPCAbortRequest):
logger.info("Aborted request %s.", request.request_id)

def _handle_health_request(self):
if self._is_errored():
if self._errored_with is not None:
self._send_unhealthy(self._errored_with)

# Raises error if unhealthy.
Expand Down Expand Up @@ -311,10 +312,6 @@ def _set_errored(self, e: BaseException):
if self._errored_with is None:
self._errored_with = e

def _is_errored(self) -> bool:
"""Check _errored status."""
return self._errored_with is not None


def run_mp_engine(engine_args: AsyncEngineArgs, usage_context: UsageContext,
ipc_path: str):
Expand Down
Loading