Skip to content

[BugFix] Fix DP Coordinator incorrect debug log message #19624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions vllm/v1/engine/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,12 @@ def process_input_socket(self, front_publish_address: str,
# engines are paused, so that we can wake the other
# engines.
engine_to_exclude, wave = msgspec.msgpack.decode(buffer)
if wave < self.current_wave:
# If the wave number is stale, ensure the message is
# handled by all the engines.
engine_to_exclude = None
if not self.engines_running:
if wave < self.current_wave:
# If the wave number is stale, ensure the message
# is handled by all the engines.
engine_to_exclude = None
Comment on lines +187 to +190
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated simplification, this check only applies to the case inside the if.


self.engines_running = True
self.stats_changed = True
self._send_start_wave(publish_back, self.current_wave,
Expand All @@ -203,22 +204,24 @@ def process_input_socket(self, front_publish_address: str,
assert outputs.utility_output is None

eng_index = outputs.engine_index
if outputs.scheduler_stats:
scheduler_stats = outputs.scheduler_stats
if scheduler_stats:
# 1. Updated request load stats - update our local
# state with these.
stats = self.engines[eng_index].request_counts
stats[0] = outputs.scheduler_stats.num_waiting_reqs
stats[1] = outputs.scheduler_stats.num_running_reqs
stats[0] = scheduler_stats.num_waiting_reqs
stats[1] = scheduler_stats.num_running_reqs
self.stats_changed = True

if (wave := outputs.wave_complete) is not None:
# 2. Notification from rank 0 engine that we've
# moved into the global paused state
# (engines_running==False)
# (engines_running==False).
if self.current_wave <= wave:
new_wave = wave + 1
logger.debug("Moving DP wave from %d to %d.",
self.current_wave, wave)
self.current_wave = wave + 1
self.current_wave, new_wave)
self.current_wave = new_wave
Comment on lines +221 to +224
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual bug fix

self.engines_running = False
self.stats_changed = True
elif (wave := outputs.start_wave) is not None and (
Expand Down