|  | 
| 13 | 13 | import jax | 
| 14 | 14 | from vllm.config import VllmConfig | 
| 15 | 15 | from vllm.logger import init_logger | 
|  | 16 | +from vllm.tasks import POOLING_TASKS | 
| 16 | 17 | from vllm.v1.engine import (EngineCoreOutputs, EngineCoreRequest, | 
| 17 | 18 |                             EngineCoreRequestType, UtilityOutput, | 
| 18 | 19 |                             UtilityResult) | 
| @@ -229,40 +230,34 @@ def _create_engine_cores( | 
| 229 | 230 | 
 | 
| 230 | 231 |         return engine_cores | 
| 231 | 232 | 
 | 
| 232 |  | -    def _add_request(self, request: EngineCoreRequest) -> Request: | 
| 233 |  | -        if request.mm_hashes is not None: | 
| 234 |  | -            # Here, if hash exists for a multimodal input, then it will be | 
| 235 |  | -            # fetched from the cache, else it will be added to the cache. | 
| 236 |  | -            # Note that the cache here is mirrored with the client cache, so | 
| 237 |  | -            # anything that has a hash must have a HIT cache entry here | 
| 238 |  | -            # as well. | 
| 239 |  | -            assert request.mm_inputs is not None | 
| 240 |  | -            request.mm_inputs = self._prefill_engines[ | 
| 241 |  | -                0].mm_input_cache_server.get_and_update_p1( | 
| 242 |  | -                    request.mm_inputs, request.mm_hashes) | 
|  | 233 | +    def add_request(self, request: EngineCoreRequest, request_wave: int = 0): | 
|  | 234 | +        # vllm_request = self._add_request(request) | 
| 243 | 235 | 
 | 
| 244 |  | -        req = Request.from_engine_core_request(request) | 
| 245 |  | - | 
| 246 |  | -        if req.use_structured_output: | 
| 247 |  | -            # Start grammar compilation asynchronously | 
| 248 |  | -            self._prefill_engines[0].structured_output_manager.grammar_init( | 
| 249 |  | -                req) | 
|  | 236 | +        # TODO(fhzhang): support multiple prefill engines. | 
|  | 237 | +        if not isinstance(request.request_id, str): | 
|  | 238 | +            raise TypeError( | 
|  | 239 | +                f"request_id must be a string, got {type(request.request_id)}") | 
| 250 | 240 | 
 | 
| 251 |  | -        return req | 
|  | 241 | +        if pooling_params := request.pooling_params: | 
|  | 242 | +            supported_pooling_tasks = [ | 
|  | 243 | +                task for task in self.get_supported_tasks() | 
|  | 244 | +                if task in POOLING_TASKS | 
|  | 245 | +            ] | 
| 252 | 246 | 
 | 
| 253 |  | -    def add_request(self, request: EngineCoreRequest): | 
| 254 |  | -        vllm_request = self._add_request(request) | 
|  | 247 | +            if pooling_params.task not in supported_pooling_tasks: | 
|  | 248 | +                raise ValueError(f"Unsupported task: {pooling_params.task!r} " | 
|  | 249 | +                                 f"Supported tasks: {supported_pooling_tasks}") | 
| 255 | 250 | 
 | 
| 256 |  | -        # TODO(fhzhang): support multiple prefill engines. | 
| 257 |  | -        self._prefill_engines[0].scheduler.add_request(vllm_request) | 
| 258 |  | -        self._requests[request.request_id] = vllm_request | 
|  | 251 | +        self._prefill_engines[0].scheduler.add_request(request) | 
|  | 252 | +        self._requests[request.request_id] = request | 
| 259 | 253 | 
 | 
| 260 | 254 |     def _handle_client_request(self, request_type: EngineCoreRequestType, | 
| 261 | 255 |                                request: Any) -> None: | 
| 262 | 256 |         """Dispatch request from client.""" | 
| 263 | 257 | 
 | 
| 264 | 258 |         if request_type == EngineCoreRequestType.ADD: | 
| 265 |  | -            self.add_request(request) | 
|  | 259 | +            req, request_wave = request | 
|  | 260 | +            self.add_request(req) | 
| 266 | 261 |         elif request_type == EngineCoreRequestType.ABORT: | 
| 267 | 262 |             # TODO(fhzhang): we need to keep track of which engine is processing | 
| 268 | 263 |             # the request and finish it there. | 
|  | 
0 commit comments