Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Bugfix][Perf] Introduce MQLLMEngine to avoid asyncio OH #8157

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
130 commits
Select commit Hold shift + click to select a range
a7a6e43
[Benchmark] Add async throughput benchmark
njhill Aug 28, 2024
ce7d159
wip
njhill Aug 29, 2024
569cd43
Merge remote-tracking branch 'njhill/async-llm-eng-bench' into reduce…
robertgshaw2-redhat Aug 29, 2024
d99ce6f
stash
robertgshaw2-redhat Aug 31, 2024
8d6b2e9
remove proxy
robertgshaw2-redhat Sep 2, 2024
14f3637
stash
robertgshaw2-redhat Sep 2, 2024
3b8311b
added mp_llm_engine
robertgshaw2-redhat Sep 2, 2024
5e2eb74
fixed
robertgshaw2-redhat Sep 2, 2024
aa62f2e
format
robertgshaw2-redhat Sep 2, 2024
863081b
cleanup
robertgshaw2-redhat Sep 2, 2024
965b97a
revert asyncllmengine
robertgshaw2-redhat Sep 2, 2024
8fd72f6
fix nit
robertgshaw2-redhat Sep 2, 2024
ddeb7c6
format
robertgshaw2-redhat Sep 2, 2024
6539e10
Merge branch 'main' into reduce-asyncio-oh
robertgshaw2-redhat Sep 2, 2024
4b111e4
clean
robertgshaw2-redhat Sep 2, 2024
a5ffd2c
fix
robertgshaw2-redhat Sep 2, 2024
1395872
stash
robertgshaw2-redhat Sep 2, 2024
938cf85
move files
robertgshaw2-redhat Sep 2, 2024
72d1d42
cleanup code
robertgshaw2-redhat Sep 3, 2024
fcdcfc9
refactor, cleanup
robertgshaw2-redhat Sep 3, 2024
659169e
updated
robertgshaw2-redhat Sep 3, 2024
9886f3d
make health check work
robertgshaw2-redhat Sep 3, 2024
5b2f057
format
robertgshaw2-redhat Sep 3, 2024
ae4564c
awk -> ack
robertgshaw2-redhat Sep 3, 2024
f9ccecc
add better shutdown
robertgshaw2-redhat Sep 3, 2024
89b730b
cleanup comment
robertgshaw2-redhat Sep 3, 2024
f3dc82b
more awk --> ack
robertgshaw2-redhat Sep 3, 2024
ac97a9e
use constant
robertgshaw2-redhat Sep 3, 2024
becd7ab
format
robertgshaw2-redhat Sep 3, 2024
b7f49ed
remove set to None
robertgshaw2-redhat Sep 3, 2024
58ae3b0
Merge remote-tracking branch 'origin/main' into reduce-asyncio-oh
njhill Sep 4, 2024
d0f9641
Remove redundant pass
njhill Sep 4, 2024
aa64042
Merge branch 'main' into reduce-asyncio-oh
robertgshaw2-redhat Sep 4, 2024
5c6e5ef
review comments
alexm-redhat Sep 4, 2024
25174a5
format
alexm-redhat Sep 4, 2024
db55c1a
add async socket reads and socket writes
alexm-redhat Sep 4, 2024
f97e1f2
Some error handling
njhill Sep 4, 2024
dd96d3e
remove async benchmark
robertgshaw2-redhat Sep 7, 2024
14d4afe
stash
robertgshaw2-redhat Sep 7, 2024
bc386ea
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 7, 2024
c0d0d60
adding error handling
robertgshaw2-redhat Sep 7, 2024
b7c1fcc
error handling
robertgshaw2-redhat Sep 7, 2024
a661b76
added
robertgshaw2-redhat Sep 7, 2024
5d00f3a
formatting in place
robertgshaw2-redhat Sep 7, 2024
5598494
added error handling
robertgshaw2-redhat Sep 8, 2024
98aaa7d
change name
robertgshaw2-redhat Sep 8, 2024
ba5ef38
change name
robertgshaw2-redhat Sep 8, 2024
18b5a94
added dead_error to asyncengine
robertgshaw2-redhat Sep 8, 2024
b048961
moved tests under openai
robertgshaw2-redhat Sep 8, 2024
6b2e18b
updated tests
robertgshaw2-redhat Sep 8, 2024
7a7ff5b
revert executor change
robertgshaw2-redhat Sep 8, 2024
b7e1fe9
revert
robertgshaw2-redhat Sep 8, 2024
48068d5
executor class
robertgshaw2-redhat Sep 8, 2024
e3daa28
cleanup format
robertgshaw2-redhat Sep 8, 2024
7880b75
format
robertgshaw2-redhat Sep 8, 2024
29fe3c8
shorten
robertgshaw2-redhat Sep 8, 2024
a720947
Revert change
robertgshaw2-redhat Sep 8, 2024
5b8cee6
enable shutdown for tp>1
robertgshaw2-redhat Sep 8, 2024
97a241d
format
robertgshaw2-redhat Sep 8, 2024
6d0570e
added error handling
robertgshaw2-redhat Sep 8, 2024
eb26791
format
robertgshaw2-redhat Sep 8, 2024
e256050
try out hwm
robertgshaw2-redhat Sep 9, 2024
59c5aca
Add stop_remote_worker_execution_loop for TP case
njhill Sep 9, 2024
62f654a
Revert unnecessary stop_remote_worker_execution_loop
njhill Sep 10, 2024
75c6157
fixed magicmock errored
robertgshaw2-redhat Sep 10, 2024
6f1cced
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 10, 2024
370c104
fall back to asyncllmengine if pp
robertgshaw2-redhat Sep 10, 2024
0cf9551
formatting
robertgshaw2-redhat Sep 10, 2024
72f72fd
stash
robertgshaw2-redhat Sep 10, 2024
ded4540
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 10, 2024
364ed7f
remove DO_LOG_STATS RPC call
robertgshaw2-redhat Sep 10, 2024
f7fdf69
cleanup health check
robertgshaw2-redhat Sep 10, 2024
7e61cdb
Use pickle for requests too
njhill Sep 10, 2024
3e84c8c
Remove hwm
robertgshaw2-redhat Sep 10, 2024
2559813
Simplify configs setup
njhill Sep 10, 2024
d0a0f8b
stash
robertgshaw2-redhat Sep 10, 2024
70e4916
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 10, 2024
021fed3
added tests
robertgshaw2-redhat Sep 10, 2024
fd6ee43
added failed health check
robertgshaw2-redhat Sep 11, 2024
ccb43a3
rename
robertgshaw2-redhat Sep 11, 2024
1aa0823
added failed abort test
robertgshaw2-redhat Sep 11, 2024
fe22fe2
formatting
robertgshaw2-redhat Sep 11, 2024
3ce8702
Some more startup RPC simplification
njhill Sep 11, 2024
1f3fc24
fix yapf conflict
njhill Sep 11, 2024
ead62dd
fix entrypoints tests
alexm-redhat Sep 11, 2024
672fb81
stash
robertgshaw2-redhat Sep 11, 2024
86312e4
fix Intel/TPU tests
alexm-redhat Sep 11, 2024
c4f6898
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 11, 2024
678e8e5
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 11, 2024
78b9e21
fix
robertgshaw2-redhat Sep 11, 2024
66c6961
formatting
robertgshaw2-redhat Sep 11, 2024
6e1e2bb
cleanup
robertgshaw2-redhat Sep 11, 2024
610b349
cleanup
robertgshaw2-redhat Sep 11, 2024
28bb8a4
format
robertgshaw2-redhat Sep 11, 2024
b266249
fix poller
robertgshaw2-redhat Sep 11, 2024
f8036a5
add graceful shutdown on abort after client closed
robertgshaw2-redhat Sep 11, 2024
a649f75
cleanup formatting
robertgshaw2-redhat Sep 11, 2024
5b3535d
added test abort
robertgshaw2-redhat Sep 11, 2024
7097e05
fix up tests
robertgshaw2-redhat Sep 11, 2024
ad3d0f8
added abort tests
robertgshaw2-redhat Sep 12, 2024
6e9c6c9
added another accurayc test
robertgshaw2-redhat Sep 12, 2024
fb8e2f9
add multistep test for accuracy of mq llm engine
robertgshaw2-redhat Sep 12, 2024
75523b2
added test genertion
robertgshaw2-redhat Sep 12, 2024
5546d2e
fixed accuracy test launch
robertgshaw2-redhat Sep 12, 2024
6403f49
added load test
robertgshaw2-redhat Sep 12, 2024
bc68b51
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 12, 2024
3bb5e52
remove file
robertgshaw2-redhat Sep 12, 2024
2ac814f
format
robertgshaw2-redhat Sep 12, 2024
179a667
added load test
robertgshaw2-redhat Sep 12, 2024
97d6c09
format
robertgshaw2-redhat Sep 12, 2024
78badc1
added load test
robertgshaw2-redhat Sep 12, 2024
a499733
format
alexm-redhat Sep 12, 2024
6a5d8d8
stash
robertgshaw2-redhat Sep 12, 2024
dfab5eb
Merge branch 'reduce-asyncio-oh-alex' of https://github.com/neuralmag…
robertgshaw2-redhat Sep 12, 2024
96f84fe
format
robertgshaw2-redhat Sep 12, 2024
ae14670
Merge branch 'main' into reduce-asyncio-oh-alex
robertgshaw2-redhat Sep 14, 2024
117c024
format
robertgshaw2-redhat Sep 14, 2024
c059713
remove debug print
robertgshaw2-redhat Sep 14, 2024
1af3297
removed stray
robertgshaw2-redhat Sep 14, 2024
97ae38d
updated
robertgshaw2-redhat Sep 14, 2024
d0fab11
switch model to avoid OOM in TPU test
robertgshaw2-redhat Sep 14, 2024
bb4d839
Merge remote-tracking branch 'origin/main' into reduce-asyncio-oh-alex
njhill Sep 16, 2024
1967f6a
Adjust timeouts
njhill Sep 16, 2024
a911323
stahs
robertgshaw2-redhat Sep 17, 2024
95ff4f3
make timeout 10000 ms
robertgshaw2-redhat Sep 17, 2024
302868e
format
robertgshaw2-redhat Sep 17, 2024
add68ee
Update examples/openai_chat_completion_client.py
robertgshaw2-redhat Sep 17, 2024
242b952
adjust RPC timeout on TPU
robertgshaw2-redhat Sep 17, 2024
3dafa26
add longer delay for check ehalth
robertgshaw2-redhat Sep 17, 2024
836a9d2
Update client.py
robertgshaw2-redhat Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added test abort
  • Loading branch information
robertgshaw2-redhat committed Sep 11, 2024
commit 5b3535d1188a7b2f6f59dbeb15096326e7b6c1b2
Empty file.
93 changes: 28 additions & 65 deletions tests/mq_llm_engine/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

MODEL = "Qwen/Qwen2-0.5B-Instruct"
ENGINE_ARGS = AsyncEngineArgs(model=MODEL)
robertgshaw2-redhat marked this conversation as resolved.
Show resolved Hide resolved
RAISED_ERROR = KeyError("foo")
RAISED_ERROR = KeyError
RAISED_VALUE = "foo"


@pytest.fixture(scope="function")
Expand All @@ -36,7 +37,8 @@ def run_with_evil_forward(engine_args: AsyncEngineArgs, ipc_path: str):
ipc_path=ipc_path)

# Raise error during first forward pass.
engine.engine.model_executor.execute_model = Mock(side_effect=RAISED_ERROR)
engine.engine.model_executor.execute_model = Mock(
side_effect=RAISED_ERROR(RAISED_VALUE))

# Run engine.
engine.start()
Expand All @@ -50,46 +52,32 @@ async def test_evil_forward(tmp_socket):

client = await engine.make_client()

# Fast health probe.
fast_health_probe_task = asyncio.create_task(
client.run_check_health_loop(timeout=1.0))

# Server should be healthy after initial probe.
await asyncio.sleep(2.0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the sleep needed? I thought the await engine.make_client() would only return once the client is connected to a healthy engine

await client.check_health()

# Throws an error in first forward pass.
try:
with pytest.raises(RAISED_ERROR):
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id=uuid.uuid4()):
pass
except Exception as e:
# First exception should be a RAISED_ERROR
assert repr(e) == repr(RAISED_ERROR)
assert client.errored
assert client.errored

# Engine is errored, should get ENGINE_DEAD_ERROR.
try:
with pytest.raises(MQEngineDeadError):
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id=uuid.uuid4()):
pass
except Exception as e:
# Next exception should be an ENGINE_DEAD_ERROR
assert client.errored, "Client should be dead."
assert isinstance(e, MQEngineDeadError), (
"Engine should be dead and raise ENGINE_DEAD_ERROR")
assert client.errored

await asyncio.sleep(2.0)
try:
await asyncio.sleep(1.0)
with pytest.raises(RAISED_ERROR):
await client.check_health()
except Exception as e:
assert repr(e) == repr(RAISED_ERROR), (
"Health check raise the original error.")
assert client.errored

# Cleanup
await fast_health_probe_task
# Shutdown.
client.close()


Expand Down Expand Up @@ -120,25 +108,18 @@ async def test_failed_health_check(tmp_socket):

# Health probe should throw RAISED_ERROR.
await asyncio.sleep(10)
try:

with pytest.raises(RAISED_ERROR):
await client.check_health()
except Exception as e:
assert client.errored, "Client should be dead."
assert repr(e) == repr(RAISED_ERROR), (
"Health check raise the original error.")
assert client.errored

# Generate call should throw ENGINE_DEAD_ERROR
try:
with pytest.raises(MQEngineDeadError):
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id=uuid.uuid4()):
pass
except Exception as e:
assert client.errored, "Client should be dead."
assert isinstance(e, MQEngineDeadError), (
"Engine should be dead and raise ENGINE_DEAD_ERROR")

# Cleanup
client.close()


Expand Down Expand Up @@ -173,34 +154,26 @@ async def bad_abort_after_2s():
await asyncio.sleep(2.0)
await client.abort(request_id="foo")

# Immediately should trigger error.
try:
await client.check_health()
except Exception as e:
assert client.errored, "Client should be dead."
assert repr(e) == repr(RAISED_ERROR), (
"Health check raise the original error.")

# Trigger an abort in 2s from now.
abort_task = asyncio.create_task(bad_abort_after_2s())

# Exception in abort() will happen during this generation.
# This will kill the engine and should return ENGINE_DEAD_ERROR.
try:
# This will kill the engine and should return ENGINE_DEAD_ERROR
# with reference to the original KeyError("foo")
with pytest.raises(MQEngineDeadError) as execinfo:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this behavior cause a race condition where the internal LLMEngine could finish a request, but before the response gets to the http server, the connection times out and the http server tries to abort the request, murdering the engine in the process?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont follow

async for _ in client.generate(
inputs="Hello my name is",
sampling_params=SamplingParams(max_tokens=2000),
request_id=uuid.uuid4()):
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Instead of triggering the bad abort task before generation starts and relying on the 2s timing interval, can we instead start the abort task once we get to the first iteration of this body loop? That should ensure that the abort happens after generation has started and make this test a lot faster

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the generation iterator is separate from the engine process

except Exception as e:
print(f"error is: {e}")
# Next exception should be an ENGINE_DEAD_ERROR
assert isinstance(e, MQEngineDeadError), (
"Engine should be dead and raise ENGINE_DEAD_ERROR")
assert client.errored

assert "KeyError" in repr(execinfo.value)
assert client.errored
await abort_task

# This should raise the original error.
with pytest.raises(RAISED_ERROR):
await client.check_health()

client.close()


Expand All @@ -211,31 +184,21 @@ async def test_bad_request(tmp_socket):

client = await engine.make_client()

# This should fail, but not crash the server.
try:
print("calling first generate")
# Invalid request should fail, but not crash the server.
with pytest.raises(ValueError):
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id="abcd-1",
lora_request=LoRARequest(
"invalid-lora", 1,
"invalid-path")):
"invalid-lora", 1, "invalid-path")):
pass
except Exception as e:
print("got exception")
assert isinstance(e, ValueError), (
"Expected ValueError when a LoRARequest in llm_engine")

# This request should be okay.
async for _ in client.generate(inputs="Hello my name is",
sampling_params=SamplingParams(),
request_id="abcd-2"):
pass

# Confirm server is still running.
await asyncio.sleep(10.)
await client.check_health()

# Shutdown.
client.close()

Expand Down
2 changes: 1 addition & 1 deletion vllm/engine/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ def ENGINE_DEAD_ERROR(

return MQEngineDeadError(
"Engine loop is not running. Inspect the stacktrace to "
f"find the original error {repr(error)}.")
f"find the original error: {repr(error)}.")
43 changes: 16 additions & 27 deletions vllm/engine/multiprocessing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ async def run_check_health_loop(self, timeout: int):
logger.debug("Shutting down MQLLMEngineClient check health loop.")

except Exception as e:
self.raise_exception(e)
self._set_errored(e)

async def run_output_handler_loop(self):
"""Get RequestOutputs from Engine and stream to request Queues"""
"""Get RequestOutputs from Engine and stream to Request Queues"""

try:
while True:
Expand Down Expand Up @@ -249,19 +249,15 @@ async def setup(self):

def close(self):
"""Destroy the ZeroMQ Context."""
# Close all sockets associated with this context and
# then terminate the context.
self.output_socket.close()
self.input_socket.close()
self.health_socket.close()
# Close all sockets and terminate the context.
self.context.destroy(linger=0)

# Cancel background tasks.
if self.health_loop is not None:
self.health_loop.cancel()
self.output_loop.cancel()

def raise_exception(self, e: BaseException):
def _set_errored(self, e: BaseException):
logger.exception(repr(e))
if self._errored_with is None:
self._errored_with = e
Expand All @@ -285,35 +281,26 @@ async def _send_get_data_rpc_request(request: RPCStartupRequest,
frame = await socket.recv(copy=False)
data = pickle.loads(frame.buffer)

if isinstance(data, Exception):
# Re-raise exceptions returned by the server
if isinstance(data, BaseException):
raise data

if not isinstance(data, expected_type):
# LoRAConfig can be None.
if expected_type == LoRAConfig and data is None:
pass
elif isinstance(data, Exception):
logger.error(error_message)
raise data
else:
raise ValueError(error_message)
elif not isinstance(data, expected_type):
raise ValueError(error_message)

return data

@staticmethod
async def _send_one_way_rpc_request(request: RPC_REQUEST_T,
socket: Socket):
"""Send one-way RPC request to trigger an action."""
# Raise handlable error for graceful shutdown.

if socket.closed:
raise MQClientClosedError()

await socket.send_multipart((pickle.dumps(request), ))

async def _await_ack(self, error_message: str, socket: Socket):
"""Await acknowledgement that a request succeeded."""
# Raise handlable error for graceful shutdown.

if socket.closed:
raise MQClientClosedError()

Expand All @@ -325,17 +312,19 @@ async def _await_ack(self, error_message: str, socket: Socket):

@staticmethod
async def _check_success(error_message: str, socket: Socket):
# Raise handlable error for graceful shutdown.
"""Confirm that socket has a VLLM_RPC_SUCCESS_STR message"""

if socket.closed:
raise MQClientClosedError()

frame = await socket.recv(copy=False)
response = pickle.loads(frame.buffer)

if not isinstance(response, str) or response != VLLM_RPC_SUCCESS_STR:
if isinstance(response, BaseException):
logger.error(error_message)
raise response
# Raise error if unsuccessful
if isinstance(response, BaseException):
raise response
elif (not isinstance(response, str) or
response != VLLM_RPC_SUCCESS_STR):
raise ValueError(error_message)

async def get_tokenizer(self, lora_request: LoRARequest):
Expand Down
4 changes: 1 addition & 3 deletions vllm/engine/multiprocessing/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ def start(self):

def cleanup(self):
"""Cleanup zeromq state on shutdown."""
self.input_socket.close()
self.output_socket.close()
self.health_socket.close()
# Closes all sockets and destroys context.
self.ctx.destroy(linger=0)
del self.engine

Expand Down
Loading