|
1 | 1 | import asyncio |
2 | 2 | import multiprocessing |
3 | 3 | import os |
| 4 | +import sys |
4 | 5 | import threading |
5 | 6 | import traceback |
6 | 7 | import uuid |
7 | 8 | from dataclasses import dataclass |
| 9 | +from io import TextIOBase |
8 | 10 | from multiprocessing.connection import wait |
9 | 11 | from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union |
10 | 12 |
|
|
16 | 18 |
|
17 | 19 | _TERMINATE = "TERMINATE" # sentinel |
18 | 20 |
|
| 21 | +# ANSI color codes |
| 22 | +CYAN = '\033[1;36m' |
| 23 | +RESET = '\033[0;0m' |
| 24 | + |
19 | 25 | # Use dedicated multiprocess context for workers. |
20 | 26 | # Both spawn and fork work |
21 | 27 | mp_method = os.getenv("MULTIPROC_METHOD", "fork") |
@@ -172,9 +178,11 @@ def kill_worker(self): |
172 | 178 | self.kill() |
173 | 179 |
|
174 | 180 | def run(self) -> None: |
175 | | - # Re-init logger in forked process, to include worker-specific prefix |
176 | | - global logger |
177 | | - logger = init_logger(__name__) |
| 181 | + # Add process-specific prefix to stdout and stderr |
| 182 | + process_name = mp.current_process().name |
| 183 | + pid = os.getpid() |
| 184 | + _add_prefix(sys.stdout, process_name, pid) |
| 185 | + _add_prefix(sys.stderr, process_name, pid) |
178 | 186 |
|
179 | 187 | del self.tasks # Not used in forked process |
180 | 188 | self.worker = self.worker_factory() |
@@ -205,3 +213,30 @@ def run(self) -> None: |
205 | 213 | logger.exception("Worker failed") |
206 | 214 |
|
207 | 215 | logger.info("Worker exiting") |
| 216 | + |
| 217 | + |
| 218 | +def _add_prefix(file: TextIOBase, worker_name: str, pid: int) -> None: |
| 219 | + """Prepend output with process-specific prefix""" |
| 220 | + |
| 221 | + prefix = f"{CYAN}({worker_name} pid={pid}){RESET} " |
| 222 | + file_write = file.write |
| 223 | + |
| 224 | + def write_with_prefix(s: str): |
| 225 | + if not s: |
| 226 | + return |
| 227 | + if file.start_new_line: |
| 228 | + file_write(prefix) |
| 229 | + idx = 0 |
| 230 | + while (next_idx := s.find('\n', idx)) != -1: |
| 231 | + next_idx += 1 |
| 232 | + file_write(s[idx:next_idx]) |
| 233 | + if next_idx == len(s): |
| 234 | + file.start_new_line = True |
| 235 | + return |
| 236 | + file_write(prefix) |
| 237 | + idx = next_idx |
| 238 | + file_write(s[idx:]) |
| 239 | + file.start_new_line = False |
| 240 | + |
| 241 | + file.start_new_line = True |
| 242 | + file.write = write_with_prefix |
0 commit comments