Skip to content

Revert "[V1][Core] Add worker_base for v1 worker (#12816)" #13440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2225,46 +2225,3 @@ def import_pynvml():
"""
import vllm.third_party.pynvml as pynvml
return pynvml


def warn_for_unimplemented_methods(cls: Type[T]) -> Type[T]:
"""
A replacement for `abc.ABC`.
When we use `abc.ABC`, subclasses will fail to instantiate
if they do not implement all abstract methods.
Here, we only require `raise NotImplementedError` in the
base class, and log a warning if the method is not implemented
in the subclass.
"""

original_init = cls.__init__

def find_unimplemented_methods(self: object):
unimplemented_methods = []
for attr_name in dir(self):
# bypass inner method
if attr_name.startswith('_'):
continue

try:
attr = getattr(self, attr_name)
# get the func of callable method
if callable(attr):
attr_func = attr.__func__
except AttributeError:
continue
src = inspect.getsource(attr_func)
if "NotImplementedError" in src:
unimplemented_methods.append(attr_name)
if unimplemented_methods:
method_names = ','.join(unimplemented_methods)
msg = (f"Methods {method_names} not implemented in {self}")
logger.warning(msg)

@wraps(original_init)
def wrapped_init(self, *args, **kwargs) -> None:
original_init(self, *args, **kwargs)
find_unimplemented_methods(self)

type.__setattr__(cls, '__init__', wrapped_init)
return cls
28 changes: 19 additions & 9 deletions vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.gpu_model_runner import GPUModelRunner
from vllm.v1.worker.worker_base import WorkerBase

logger = init_logger(__name__)

if TYPE_CHECKING:
from vllm.v1.core.scheduler_output import SchedulerOutput


class Worker(WorkerBase):
class Worker:

def __init__(
self,
Expand All @@ -41,11 +40,23 @@ def __init__(
is_driver_worker: bool = False,
):

super().__init__(vllm_config=vllm_config,
local_rank=local_rank,
rank=rank,
distributed_init_method=distributed_init_method,
is_driver_worker=is_driver_worker)
# TODO: use WorkerBase.__init__(self, vllm_config=vllm_config)
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
self.lora_config = vllm_config.lora_config
self.load_config = vllm_config.load_config
self.parallel_config = vllm_config.parallel_config
self.scheduler_config = vllm_config.scheduler_config
self.device_config = vllm_config.device_config
self.speculative_config = vllm_config.speculative_config
self.prompt_adapter_config = vllm_config.prompt_adapter_config
self.observability_config = vllm_config.observability_config

self.parallel_config.rank = rank
self.local_rank = local_rank
self.rank = rank
self.distributed_init_method = distributed_init_method

if self.model_config.trust_remote_code:
# note: lazy import to avoid importing torch before initializing
Expand Down Expand Up @@ -116,8 +127,7 @@ def init_device(self):
set_random_seed(self.model_config.seed)

# Construct the model runner
self.model_runner: GPUModelRunner = GPUModelRunner(
self.vllm_config, self.device)
self.model_runner = GPUModelRunner(self.vllm_config, self.device)

def load_model(self) -> None:
if self.vllm_config.model_config.enable_sleep_mode:
Expand Down
63 changes: 0 additions & 63 deletions vllm/v1/worker/worker_base.py

This file was deleted.

71 changes: 33 additions & 38 deletions vllm/worker/worker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dataclasses
import os
import time
from abc import abstractmethod
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union

import cloudpickle
Expand All @@ -19,17 +19,15 @@
from vllm.sequence import ExecuteModelRequest, IntermediateTensors
from vllm.utils import (enable_trace_function_call_for_thread,
resolve_obj_by_qualname, run_method,
update_environment_variables,
warn_for_unimplemented_methods)
update_environment_variables)
from vllm.worker.model_runner_base import (BroadcastableModelInput,
ModelRunnerBase,
ModelRunnerInputBase)

logger = init_logger(__name__)


@warn_for_unimplemented_methods
class WorkerBase:
class WorkerBase(ABC):
"""Worker interface that allows vLLM to cleanly separate implementations for
different hardware. Also abstracts control plane communication, e.g., to
communicate request metadata to other workers.
Expand All @@ -55,29 +53,33 @@ def __init__(
from vllm.platforms import current_platform
self.current_platform = current_platform

@abstractmethod
def init_device(self) -> None:
"""Initialize device state, such as loading the model or other on-device
memory allocations.
"""
raise NotImplementedError

def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
"""Initialize the KV cache with the given size in blocks.
"""
raise NotImplementedError
@abstractmethod
def determine_num_available_blocks(self) -> Tuple[int, int]:
"""Determine the number of available blocks for the GPU KV cache and
swappable CPU KV cache.

def get_model(self) -> nn.Module:
raise NotImplementedError
The implementation may run profiling or other heuristics to determine
the size of caches.

def load_model(self) -> None:
"""Load model onto target device."""
Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
are blocks that are "active" on the device and can be appended to.
num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
appended to.
"""
raise NotImplementedError

def execute_model(
self,
execute_model_req: Optional[ExecuteModelRequest] = None
) -> Optional[List[SamplerOutput]]:
@abstractmethod
def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
"""Initialize the KV cache with the given size in blocks.
"""
raise NotImplementedError

def start_worker_execution_loop(self) -> None:
Expand All @@ -92,43 +94,40 @@ def start_worker_execution_loop(self) -> None:
if output is None:
return None

def determine_num_available_blocks(self) -> Tuple[int, int]:
"""Determine the number of available blocks for the GPU KV cache and
swappable CPU KV cache.

The implementation may run profiling or other heuristics to determine
the size of caches.
@abstractmethod
def get_model(self) -> nn.Module:
raise NotImplementedError

Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
are blocks that are "active" on the device and can be appended to.
num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
appended to.
"""
@abstractmethod
def execute_model(
self,
execute_model_req: Optional[ExecuteModelRequest] = None
) -> Optional[List[SamplerOutput]]:
raise NotImplementedError

@abstractmethod
def get_cache_block_size_bytes(self) -> int:
"""Return the size of a single cache block, in bytes. Used in
speculative decoding.
"""
raise NotImplementedError

@abstractmethod
def add_lora(self, lora_request: LoRARequest) -> bool:
raise NotImplementedError

@abstractmethod
def remove_lora(self, lora_id: int) -> bool:
raise NotImplementedError

@abstractmethod
def pin_lora(self, lora_id: int) -> bool:
raise NotImplementedError

@abstractmethod
def list_loras(self) -> Set[int]:
raise NotImplementedError

@property
def vocab_size(self) -> int:
"""Get vocabulary size from model configuration."""
return self.model_config.get_vocab_size()


class DelegateWorkerBase(WorkerBase):
"""
Expand Down Expand Up @@ -157,10 +156,6 @@ def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
self.worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)

def load_model(self) -> None:
"""Load model onto target device."""
self.worker.load_model()

def get_model(self) -> nn.Module:
return self.worker.get_model()

Expand Down