Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/offline_inference/rlhf.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ def __init__(self, *args, **kwargs):

# Synchronize the updated weights to the inference engine.
for name, p in train_model.named_parameters():
handle = llm.collective_rpc.remote("update_weight", args=(name, p.dtype, p.shape))
dtype_name = str(p.dtype).split(".")[-1]
handle = llm.collective_rpc.remote(
"update_weight", args=(name, dtype_name, p.shape)
)
model_update_group.broadcast(p, src=0, stream=torch.cuda.current_stream())
ray.get(handle)

Expand Down
3 changes: 2 additions & 1 deletion examples/offline_inference/rlhf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def init_weight_update_group(
self.device,
)

def update_weight(self, name, dtype, shape):
def update_weight(self, name, dtype_name, shape):
dtype = getattr(torch, dtype_name)
weight = torch.empty(shape, dtype=dtype, device="cuda")
self.model_update_group.broadcast(
weight, src=0, stream=torch.cuda.current_stream()
Expand Down
6 changes: 6 additions & 0 deletions vllm/distributed/device_communicators/pynccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
else:
self.rank = group.rank
self.world_size = group.world_size
logger.warning(
f"PyNcclCommunicator init: rank {self.rank} world_size {self.world_size}, group {group}"

Check failure on line 51 in vllm/distributed/device_communicators/pynccl.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/distributed/device_communicators/pynccl.py:51:81: E501 Line too long (100 > 80)

Check failure on line 51 in vllm/distributed/device_communicators/pynccl.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

vllm/distributed/device_communicators/pynccl.py:51:13: G004 Logging statement uses f-string
)

self.group = group

Expand Down Expand Up @@ -265,6 +268,9 @@
self.comm, cudaStream_t(stream.cuda_stream))

def broadcast(self, tensor: torch.Tensor, src: int, stream=None):
logger.warning(
f"broadcast: rank {self.rank}/{self.world_size}, group {self.group}, src {src}, shape {tensor.shape}"

Check failure on line 272 in vllm/distributed/device_communicators/pynccl.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/distributed/device_communicators/pynccl.py:272:81: E501 Line too long (113 > 80)

Check failure on line 272 in vllm/distributed/device_communicators/pynccl.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

vllm/distributed/device_communicators/pynccl.py:272:13: G004 Logging statement uses f-string
)
if self.disabled:
return
assert tensor.device == self.device, (
Expand Down
4 changes: 3 additions & 1 deletion vllm/distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ def create(
can call `StatelessProcessGroup.create` to form a group, and then process A, B,
C, and D can call `StatelessProcessGroup.create` to form another group.
""" # noqa
print("StatelessProcessGroup.create with rank ", rank, ", world_size ",
world_size)
launch_server = rank == 0
if launch_server:
# listen on the specified interface (instead of 0.0.0.0)
Expand Down Expand Up @@ -423,7 +425,7 @@ def init_gloo_process_group(backend: Backend, prefix_store: PrefixStore,
group_rank: int, group_size: int,
timeout: timedelta) -> ProcessGroup:
"""
Stateless init ProcessGroup with gloo backend compatible with
Stateless init ProcessGroup with gloo backend compatible with
different torch versions.
"""
if is_torch_equal_or_newer("2.6"):
Expand Down
1 change: 0 additions & 1 deletion vllm/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def __init__(
**kwargs,
) -> None:
"""LLM constructor."""

if "disable_log_stats" not in kwargs:
kwargs["disable_log_stats"] = True

Expand Down
1 change: 1 addition & 0 deletions vllm/executor/ray_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
lazily initialized after Ray sets CUDA_VISIBLE_DEVICES."""

def __init__(self, *args, **kwargs) -> None:
logger.warning(f"===quinnzhu Initializing RayWorkerWrapper with {kwargs["rpc_rank"]=}.")

Check failure on line 43 in vllm/executor/ray_utils.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/executor/ray_utils.py:43:81: E501 Line too long (100 > 80)

Check failure on line 43 in vllm/executor/ray_utils.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

vllm/executor/ray_utils.py:43:28: G004 Logging statement uses f-string
super().__init__(*args, **kwargs)
# Since the compiled DAG runs a main execution
# in a different thread that calls cuda.set_device.
Expand Down
26 changes: 16 additions & 10 deletions vllm/model_executor/layers/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
def adjust_scalar_to_fused_array(param, loaded_weight, shard_id):
"""For fused modules (QKV and MLP) we have an array of length
N that holds 1 scale for each "logical" matrix. So the param
is an array of length N. The loaded_weight corresponds to
one of the shards on disk. Here, we slice the param based on
is an array of length N. The loaded_weight corresponds to
one of the shards on disk. Here, we slice the param based on
the shard_id for loading.
"""
qkv_idxs = {"q": 0, "k": 1, "v": 2}
Expand All @@ -118,13 +118,13 @@

For example, given bnb weight attributes as below:
{
'bnb_shard_offsets': array([0, 4, 8, 16]),
'bnb_shard_offsets': array([0, 4, 8, 16]),
'bnb_quant_state': {0: ..., 1: ..., 2: ...},
}

The function will return:
{
'bnb_shard_offsets': array([0, 4]),
'bnb_shard_offsets': array([0, 4]),
'bnb_quant_state': {0: ...},
}
and
Expand Down Expand Up @@ -156,13 +156,13 @@
output_partition_sizes: list[int], input_size: int,
output_size: int, params_dtype: torch.dtype,
**extra_weight_attrs):
"""Create weights for a linear layer.
"""Create weights for a linear layer.
The weights will be set as attributes of the layer.

Args:
layer: The layer that is using the LinearMethodBase factory.
input_size_per_partition: Size of the weight input dim on rank X.
output_partition_sizes: Sizes of the output dim of each logical
output_partition_sizes: Sizes of the output dim of each logical
weight on rank X. E.g., output_partition_sizes for QKVLinear
is a list contains the width of Wq, Wk, Wv on rank X.
input_size: Size of the input dim of the weight across all ranks.
Expand Down Expand Up @@ -464,7 +464,7 @@
output_sizes: list of output sizes packed into one output, like for QKV
the list would be size 3.
prefix: The name of the layer in the state dict, including all parents
(e.g. model.layers.0.qkv_proj)
(e.g. model.layers.0.qkv_proj)
"""

def __init__(
Expand Down Expand Up @@ -559,11 +559,13 @@
if output_dim is not None and not is_sharded_weight:
shard_size = param_data.shape[output_dim]
start_idx = self.tp_rank * shard_size
logger.info(f"ColumnParallel loaded_weight before narrow : {loaded_weight.shape}")
loaded_weight = loaded_weight.narrow(output_dim, start_idx,

Check failure on line 563 in vllm/model_executor/layers/linear.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/model_executor/layers/linear.py:563:81: E501 Line too long (85 > 80)

Check failure on line 563 in vllm/model_executor/layers/linear.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

vllm/model_executor/layers/linear.py:563:17: G004 Logging statement uses f-string
shard_size)
logger.info(f"ColumnParallel loaded_weight after narrow : {loaded_weight.shape}")

# Special case for loading scales off disk, which often do not
# have a shape (such as in the case of AutoFP8).

Check failure on line 568 in vllm/model_executor/layers/linear.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/model_executor/layers/linear.py:568:81: E501 Line too long (84 > 80)

Check failure on line 568 in vllm/model_executor/layers/linear.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

vllm/model_executor/layers/linear.py:568:17: G004 Logging statement uses f-string
if len(loaded_weight.shape) == 0:
loaded_weight = loaded_weight.reshape(1)

Expand Down Expand Up @@ -946,7 +948,7 @@
self.output_sizes = [
self.num_heads * self.head_size * tp_size, # q_proj
self.num_kv_heads * self.head_size * tp_size, # k_proj
self.num_kv_heads * self.head_size * tp_size, # v_proj
self.num_kv_heads * self.head_size * tp_size, # v_proj
]

super().__init__(input_size=input_size,
Expand Down Expand Up @@ -979,7 +981,7 @@
def _load_fused_module_from_checkpoint(self, param: BasevLLMParameter,
loaded_weight: torch.Tensor):
"""
Handle special case for models where QKV layers are already
Handle special case for models where QKV layers are already
fused on disk. In this case, we have no shard id. This function
determmines the shard id by splitting these layers and then calls
the weight loader using the shard id.
Expand Down Expand Up @@ -1203,8 +1205,10 @@
start_idx = shard_id * shard_size

if not is_sharded_weight:
logger.info(f"QKV loaded_weight before narrow : {loaded_weight.shape}")
loaded_weight = loaded_weight.narrow(output_dim, start_idx,
shard_size)
logger.info(f"QKV loaded_weight after narrow : {loaded_weight.shape}")

# Special case for for AQLM codebooks.
elif is_metadata:
Expand Down Expand Up @@ -1344,8 +1348,10 @@
if input_dim is not None and not is_sharded_weight:
shard_size = param_data.shape[input_dim]
start_idx = self.tp_rank * shard_size
logger.info(f"RowParallel loaded_weight before narrow : {loaded_weight.shape}")
loaded_weight = loaded_weight.narrow(input_dim, start_idx,
shard_size)
logger.info(f"RowParallel loaded_weight after narrow : {loaded_weight.shape}")

# Special case for loading scales off disk, which often do not
# have a shape (such as in the case of AutoFP8).
Expand Down Expand Up @@ -1569,7 +1575,7 @@
param: nn.Parameter,
) -> nn.Parameter:
"""
Given the placeholder param,
Given the placeholder param,
return the corresponding param in the proj layers.
"""
target_param_list = [
Expand Down
8 changes: 5 additions & 3 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

class EngineCoreClient(ABC):
"""
EngineCoreClient: subclasses handle different methods for pushing
EngineCoreClient: subclasses handle different methods for pushing
and pulling from the EngineCore for asyncio / multiprocessing.

Subclasses:
Expand Down Expand Up @@ -232,7 +232,7 @@ async def collective_rpc_async(

class InprocClient(EngineCoreClient):
"""
InprocClient: client for in-process EngineCore. Intended
InprocClient: client for in-process EngineCore. Intended
for use in LLMEngine for V0-style add_request() and step()
EngineCore setup in this process (no busy loop).

Expand Down Expand Up @@ -377,7 +377,7 @@ class MPClient(EngineCoreClient):

* pushes EngineCoreRequests via input_socket
* pulls EngineCoreOutputs via output_socket

* AsyncMPClient subclass for AsyncLLM usage
* SyncMPClient subclass for LLM usage
"""
Expand Down Expand Up @@ -563,6 +563,7 @@ class SyncMPClient(MPClient):

def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],
log_stats: bool):
logger.info("===quinnzhu Initializing SyncMPClient")
super().__init__(
asyncio_mode=False,
vllm_config=vllm_config,
Expand Down Expand Up @@ -710,6 +711,7 @@ def collective_rpc(self,
timeout: Optional[float] = None,
args: tuple = (),
kwargs: Optional[dict[str, Any]] = None) -> list[_R]:
logger.info(f"===quinnzhu collective_rpc {method=}")
return self.call_utility("collective_rpc", method, timeout, args,
kwargs)

Expand Down
4 changes: 3 additions & 1 deletion vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def __init__(
distributed_init_method: str,
is_driver_worker: bool = False,
):

logger.warning(
f"===quinnzhu Worker init with {local_rank=}, {rank=}, {distributed_init_method=}"
)
super().__init__(vllm_config=vllm_config,
local_rank=local_rank,
rank=rank,
Expand Down