Skip to content

Commit a62480a

Browse files
committed
mapmaker: separately pass the process log queue to MapMaker; improve log reading, esp. at mapmaker termination.
1 parent a27c72d commit a62480a

File tree

3 files changed

+54
-29
lines changed

3 files changed

+54
-29
lines changed

mapserver/maker/__init__.py

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -86,34 +86,41 @@ def log_file(pid):
8686

8787
#===============================================================================
8888

89-
def _run_in_loop(func, args):
89+
def _run_in_loop(func, *args):
90+
#=============================
9091
loop = uvloop.new_event_loop()
91-
loop.run_until_complete(func(args))
92+
loop.run_until_complete(func(*args))
9293

93-
async def _make_map(params):
94-
#===========================
94+
async def _make_map(params, process_log_queue: multiprocessing.Queue):
95+
#=====================================================================
9596
try:
96-
mapmaker = MapMaker(params)
97+
mapmaker = MapMaker(params, process_log_queue=process_log_queue)
9798
mapmaker.make()
9899
except Exception as e:
99100
utils.log.exception(e, exc_info=True)
101+
## And now we need to send a CRITICAL failed message onto the msg_queue...
102+
## as any raised exception will end up here
103+
## e.g. ???
104+
## {"exc_info": true, "level": "error", "timestamp": "2025-08-18T08:01:06.842287Z", "msg": "GitCommandError(['git', 'checkout', 'staging'], 1, b\"error: pathspec 'staging' did not match any file(s) known to git\", b'')"}
100105
sys.exit(1)
101106

102107
#===============================================================================
103108

104109
class MakerProcess(multiprocessing.Process):
105-
def __init__(self, params: dict[str, Any]):
110+
def __init__(self, params: dict[str, Any], msg_queue: multiprocessing.Queue):
106111
id = str(uuid.uuid4())
107112
self.__process_log_queue = multiprocessing.Queue()
108-
params['logQueue'] = self.__process_log_queue
109-
super().__init__(target=_run_in_loop, args=(_make_map, params), name=id)
113+
super().__init__(target=_run_in_loop, args=(_make_map, params, self.__process_log_queue), name=id)
110114
self.__id = id
111115
self.__process_id = None
112116
self.__log_file = None
113-
self.__msg_queue = multiprocessing.Queue()
117+
self.__msg_queue = msg_queue
114118
self.__status = 'queued'
115119
self.__result = {}
116120

121+
def __str__(self):
122+
return f'MakerProcess {self.__id}: {self.__status}, {self.is_alive()} ({self.pid})'
123+
117124
@property
118125
def completed(self):
119126
return self.__status in ['terminated', 'aborted']
@@ -126,24 +133,17 @@ def log_file(self):
126133
def id(self):
127134
return self.__id
128135

129-
@property
130-
def msg_queue(self):
131-
return self.__msg_queue
132-
133136
@property
134137
def process_id(self):
135138
return self.__process_id
136139

137140
@property
138-
def result(self):
141+
def result(self) -> dict:
139142
return self.__result
140143

141144
@property
142145
def status(self) -> str:
143146
return self.__status
144-
@status.setter
145-
def status(self, value: str):
146-
self.__status = value
147147

148148
def close(self):
149149
#===============
@@ -205,6 +205,9 @@ def __init__(self):
205205
os.makedirs(settings['MAPMAKER_LOGS'])
206206
self.__map_dir = settings['FLATMAP_ROOT']
207207

208+
self.__last_running_process_id: Optional[str] = None
209+
self.__last_running_process_status: str = 'terminated'
210+
self.__process_msg_queue = multiprocessing.Queue()
208211
self.__running_process: Optional[MakerProcess] = None
209212
self.__terminate_event = asyncio.Event()
210213
self.__process_lock = asyncio.Lock()
@@ -227,16 +230,25 @@ async def get_log(self, id, start_line=1):
227230
return log_lines
228231
return ''
229232

233+
def __flush_process_log(self):
234+
#=============================
235+
while True:
236+
try:
237+
self.__process_msg_queue.get(block=False)
238+
except queue.Empty:
239+
return
240+
230241
async def get_process_log(self, id):
231242
#===================================
232243
if self.__running_process is not None and id == self.__running_process.id:
233244
while self.__running_process is not None and not self.__running_process.completed:
234245
try:
235-
msg = self.__running_process.msg_queue.get(block=False)
246+
msg = self.__process_msg_queue.get(block=False)
236247
yield msg
237248
except queue.Empty:
238249
await asyncio.sleep(0.01)
239250

251+
240252
async def make(self, data: MakerData) -> MakerStatus:
241253
#====================================================
242254
params = {key: value for (key, value) in dataclasses.asdict(data).items()
@@ -249,7 +261,8 @@ async def make(self, data: MakerData) -> MakerStatus:
249261
'logPath': settings['MAPMAKER_LOGS'] # Logfile name is `PROCESS_ID.json.log`
250262
})
251263
if self.__running_process is None:
252-
process = MakerProcess(params)
264+
self.__flush_process_log()
265+
process = MakerProcess(params, self.__process_msg_queue)
253266
await self.__start_process(process)
254267
return await self.status(process.id)
255268
else:
@@ -267,15 +280,23 @@ async def _run(self):
267280
process = self.__running_process
268281
process.read_process_log_queue()
269282
if not process.is_alive():
270-
process.close()
271-
self.__running_process = None
272-
maker_result = process.result
273-
if len(maker_result):
283+
if not self.__process_msg_queue.empty():
284+
self.__log.error(f'Process log queue is not empty: {process.status} {process.result}')
285+
while True:
286+
try:
287+
self.__log.warn(f'Queued: {self.__process_msg_queue.get(block=False)}')
288+
except queue.Empty:
289+
break
290+
process.close() # This updates status
291+
self.__last_running_process_id = process.id
292+
self.__last_running_process_status = process.status
293+
if len(process.result):
274294
info = ', '.join([ f'{key}: {value}' for key in MAKER_RESULT_KEYS
275-
if (value := maker_result.get(key)) is not None ])
295+
if (value := process.result.get(key)) is not None ])
276296
self.__log.info(f'Mapmaker succeeded: {process.name}, Map {info}')
277297
else:
278298
self.__log.error(f'Mapmaker FAILED: {process.name}')
299+
self.__running_process = None
279300
await asyncio.sleep(0.01)
280301

281302
def terminate(self):
@@ -288,6 +309,9 @@ async def status(self, id) -> MakerStatus:
288309
if self.__running_process is not None and id == self.__running_process.id:
289310
status = self.__running_process.status
290311
pid = self.__running_process.process_id
312+
elif id == self.__last_running_process_id:
313+
status = self.__last_running_process_status
314+
self.__last_running_process_id = None
291315
else:
292316
status = 'unknown'
293317
return MakerStatus(status, id, pid)
@@ -297,6 +321,7 @@ async def __start_process(self, process: MakerProcess):
297321
process.start()
298322
async with self.__process_lock:
299323
self.__running_process = process
324+
self.__last_running_process_id = None
300325
self.__log.info(f'Started mapmaker process: {process.name}, PID: {process.process_id}')
301326

302327
#===============================================================================

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ dependencies = [
2121
"asyncpg>=0.30.0",
2222
"flatmapknowledge @ https://github.com/AnatomicMaps/flatmap-knowledge/releases/download/v2.5.3/flatmapknowledge-2.5.3-py3-none-any.whl",
2323
"mapknowledge @ https://github.com/AnatomicMaps/map-knowledge/releases/download/v1.3.3/mapknowledge-1.3.3-py3-none-any.whl",
24-
"mapmaker @ https://github.com/AnatomicMaps/flatmap-maker/releases/download/v1.21.1/mapmaker-1.21.1-py3-none-any.whl",
24+
"mapmaker @ https://github.com/AnatomicMaps/flatmap-maker/releases/download/v1.21.2/mapmaker-1.21.2-py3-none-any.whl",
2525
"granian>=2.2.5",
2626
"semver>=3.0.4",
2727
"anyio>=4.9.0",

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)