diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 90d052bb8499b..2145795bde653 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -62,12 +62,12 @@ class StreamLoader: def __init__( self, file: LoadFile, - num_thread: int = 32, + num_threads: int = 32, use_pinmem: bool = False, use_direct_io: bool = False, ) -> None: self.file = file - self.num_thread = num_thread + self.num_threads = num_threads self.use_pinmem = use_pinmem self.use_direct_io = use_direct_io # TODO assert file type is safetensors @@ -86,7 +86,7 @@ def _tensors_reader( ) -> None: device = torch.device(device) is_cuda = device.type == "cuda" - # TODO use stream nonblocking IO + # TODO barrier could work in using stream nonblocking IO for tensor_meta in tensor_metas: tensor_buffer = self.file.load_to_buffer( offset=tensor_meta.real_offset, count=tensor_meta.count) @@ -104,7 +104,7 @@ def get_weights_iterator( tensors_per_reader: List[Tuple[TensorMeta, ...]] = (split_continue_tensors( self.tensors_metas, - self.num_thread)) + self.num_threads)) effective_num_readers = len(tensors_per_reader) self._reader_pool = concurrent.futures.ThreadPoolExecutor( @@ -116,7 +116,6 @@ def get_weights_iterator( futures: List[concurrent.futures.Future] = [] barrier = threading.Barrier(effective_num_readers) - for thread_idx, tensor_metas in enumerate(tensors_per_reader): future = self._reader_pool.submit( self._tensors_reader, @@ -249,7 +248,7 @@ def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): ) safetensors_loader = StreamLoader( file=safetensors_s3, - num_thread=self.num_threads, + num_threads=self.num_threads, use_pinmem=self.use_pinmem, use_direct_io=self.use_direct_io, )