Skip to content

Commit

Permalink
Fix hang when Flag_dataloader_use_file_descriptor=True (#9080)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepllz authored Sep 5, 2024
1 parent 8ee99a4 commit 951e3d4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
20 changes: 10 additions & 10 deletions llm/predict/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,16 +984,16 @@ def predict(self, input_texts: list[str], return_tokens=False):

result_queue = mp.Queue()
tensor_queue = mp.Queue()

output_tensor = paddle.full(shape=[MAX_BSZ + 2, 1], fill_value=2, dtype="int64").cpu()
tensor_queue.put(output_tensor)

done_event = mp.Event()
read_res_process = mp.Process(
target=llm_utils.read_res, args=[self.model_name_or_path, tensor_queue, result_queue]
target=llm_utils.read_res, args=[self.model_name_or_path, tensor_queue, result_queue, done_event]
)
if self.tensor_parallel_rank == 0:
read_res_process.start()

output_tensor = paddle.full(shape=[MAX_BSZ + 2, 1], fill_value=2, dtype="int64").cpu()
tensor_queue.put(output_tensor)
done_event.wait()
s_time = time.time()
while self.model_inputs["not_need_stop"]:
self._infer(self.model_inputs)
Expand Down Expand Up @@ -1109,17 +1109,17 @@ def predict(self, input_texts: list[str], return_tokens=False):

result_queue = mp.Queue()
tensor_queue = mp.Queue()

output_tensor = paddle.full(shape=[MAX_BSZ + 2, 1], fill_value=2, dtype="int64").cpu()
tensor_queue.put(output_tensor)
done_event = mp.Event()

read_res_process = mp.Process(
target=llm_utils.read_res, args=[self.model_name_or_path, tensor_queue, result_queue]
target=llm_utils.read_res, args=[self.model_name_or_path, tensor_queue, result_queue, done_event]
)

if self.tensor_parallel_rank == 0:
read_res_process.start()

output_tensor = paddle.full(shape=[MAX_BSZ + 2, 1], fill_value=2, dtype="int64").cpu()
tensor_queue.put(output_tensor)
done_event.wait()
s_time = time.time()
while self.model_inputs["not_need_stop"]:
self.predictor.run(list(self.model_inputs.values()))
Expand Down
4 changes: 2 additions & 2 deletions paddlenlp/utils/llm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,14 +750,14 @@ def get_model_max_position_embeddings(config: PretrainedConfig) -> Optional[int]
return None


def read_res(model_name_or_path: str, tensor_queue: mp.Queue, result_queue: mp.Queue):
def read_res(model_name_or_path: str, tensor_queue: mp.Queue, result_queue: mp.Queue, done_event: mp.Event):
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)

paddle.device.set_device("cpu")
paddle.disable_static()
outputs = []
output_tensor = tensor_queue.get(timeout=1)

done_event.set()
logger.info("Start read result message")
logger.info(f"Current path is {os.getcwd()}")

Expand Down

0 comments on commit 951e3d4

Please sign in to comment.