Skip to content

Commit 4c414b8

Browse files
authored
Merge pull request #3024 from oesteban/enh/robuster-write-report
ENH: Avoid loading result from file when writing reports
2 parents 01de656 + c120ee5 commit 4c414b8

File tree

4 files changed

+85
-55
lines changed

4 files changed

+85
-55
lines changed

nipype/pipeline/engine/nodes.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from .utils import (
3838
_parameterization_dir, save_hashfile as _save_hashfile, load_resultfile as
3939
_load_resultfile, save_resultfile as _save_resultfile, nodelist_runner as
40-
_node_runner, strip_temp as _strip_temp, write_report,
40+
_node_runner, strip_temp as _strip_temp, write_node_report,
4141
clean_working_directory, merge_dict, evaluate_connect_function)
4242
from .base import EngineBase
4343

@@ -464,8 +464,7 @@ def run(self, updatehash=False):
464464

465465
# Store runtime-hashfile, pre-execution report, the node and the inputs set.
466466
_save_hashfile(hashfile_unfinished, self._hashed_inputs)
467-
write_report(
468-
self, report_type='preexec', is_mapnode=isinstance(self, MapNode))
467+
write_node_report(self, is_mapnode=isinstance(self, MapNode))
469468
savepkl(op.join(outdir, '_node.pklz'), self)
470469
savepkl(op.join(outdir, '_inputs.pklz'), self.inputs.get_traitsfree())
471470

@@ -484,8 +483,7 @@ def run(self, updatehash=False):
484483
# Tear-up after success
485484
shutil.move(hashfile_unfinished,
486485
hashfile_unfinished.replace('_unfinished', ''))
487-
write_report(
488-
self, report_type='postexec', is_mapnode=isinstance(self, MapNode))
486+
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
489487
logger.info('[Node] Finished "%s".', self.fullname)
490488
return result
491489

@@ -1204,7 +1202,7 @@ def get_subnodes(self):
12041202
"""Generate subnodes of a mapnode and write pre-execution report"""
12051203
self._get_inputs()
12061204
self._check_iterfield()
1207-
write_report(self, report_type='preexec', is_mapnode=True)
1205+
write_node_report(self, result=None, is_mapnode=True)
12081206
return [node for _, node in self._make_nodes()]
12091207

12101208
def num_subnodes(self):

nipype/pipeline/engine/utils.py

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -116,68 +116,60 @@ def nodelist_runner(nodes, updatehash=False, stop_first=False):
116116
yield i, result, err
117117

118118

119-
def write_report(node, report_type=None, is_mapnode=False):
120-
"""Write a report file for a node"""
119+
def write_node_report(node, result=None, is_mapnode=False):
120+
"""Write a report file for a node."""
121121
if not str2bool(node.config['execution']['create_report']):
122122
return
123123

124-
if report_type not in ['preexec', 'postexec']:
125-
logger.warning('[Node] Unknown report type "%s".', report_type)
126-
return
127-
128124
cwd = node.output_dir()
129-
report_dir = os.path.join(cwd, '_report')
130-
report_file = os.path.join(report_dir, 'report.rst')
131-
makedirs(report_dir, exist_ok=True)
132-
133-
logger.debug('[Node] Writing %s-exec report to "%s"', report_type[:-4],
134-
report_file)
135-
if report_type.startswith('pre'):
136-
lines = [
137-
write_rst_header('Node: %s' % get_print_name(node), level=0),
138-
write_rst_list(
139-
['Hierarchy : %s' % node.fullname,
140-
'Exec ID : %s' % node._id]),
141-
write_rst_header('Original Inputs', level=1),
142-
write_rst_dict(node.inputs.trait_get()),
143-
]
144-
with open(report_file, 'wt') as fp:
145-
fp.write('\n'.join(lines))
146-
return
125+
report_file = Path(cwd) / '_report' / 'report.rst'
126+
report_file.parent.mkdir(exist_ok=True, parents=True)
147127

148128
lines = [
129+
write_rst_header('Node: %s' % get_print_name(node), level=0),
130+
write_rst_list(
131+
['Hierarchy : %s' % node.fullname,
132+
'Exec ID : %s' % node._id]),
133+
write_rst_header('Original Inputs', level=1),
134+
write_rst_dict(node.inputs.trait_get()),
135+
]
136+
137+
if result is None:
138+
logger.debug('[Node] Writing pre-exec report to "%s"', report_file)
139+
report_file.write_text('\n'.join(lines))
140+
return
141+
142+
logger.debug('[Node] Writing post-exec report to "%s"', report_file)
143+
lines += [
149144
write_rst_header('Execution Inputs', level=1),
150145
write_rst_dict(node.inputs.trait_get()),
146+
write_rst_header('Execution Outputs', level=1)
151147
]
152148

153-
result = node.result # Locally cache result
154149
outputs = result.outputs
155-
156150
if outputs is None:
157-
with open(report_file, 'at') as fp:
158-
fp.write('\n'.join(lines))
151+
lines += ['None']
152+
report_file.write_text('\n'.join(lines))
159153
return
160154

161-
lines.append(write_rst_header('Execution Outputs', level=1))
162-
163155
if isinstance(outputs, Bunch):
164156
lines.append(write_rst_dict(outputs.dictcopy()))
165157
elif outputs:
166158
lines.append(write_rst_dict(outputs.trait_get()))
159+
else:
160+
lines += ['Outputs object was empty.']
167161

168162
if is_mapnode:
169163
lines.append(write_rst_header('Subnode reports', level=1))
170164
nitems = len(ensure_list(getattr(node.inputs, node.iterfield[0])))
171165
subnode_report_files = []
172166
for i in range(nitems):
173-
nodecwd = os.path.join(cwd, 'mapflow', '_%s%d' % (node.name, i),
174-
'_report', 'report.rst')
175-
subnode_report_files.append('subnode %d : %s' % (i, nodecwd))
167+
subnode_file = Path(cwd) / 'mapflow' / (
168+
'_%s%d' % (node.name, i)) / '_report' / 'report.rst'
169+
subnode_report_files.append('subnode %d : %s' % (i, subnode_file))
176170

177171
lines.append(write_rst_list(subnode_report_files))
178-
179-
with open(report_file, 'at') as fp:
180-
fp.write('\n'.join(lines))
172+
report_file.write_text('\n'.join(lines))
181173
return
182174

183175
lines.append(write_rst_header('Runtime info', level=1))
@@ -189,15 +181,9 @@ def write_report(node, report_type=None, is_mapnode=False):
189181
'prev_wd': getattr(result.runtime, 'prevcwd', '<not-set>'),
190182
}
191183

192-
if hasattr(result.runtime, 'cmdline'):
193-
rst_dict['command'] = result.runtime.cmdline
194-
195-
# Try and insert memory/threads usage if available
196-
if hasattr(result.runtime, 'mem_peak_gb'):
197-
rst_dict['mem_peak_gb'] = result.runtime.mem_peak_gb
198-
199-
if hasattr(result.runtime, 'cpu_percent'):
200-
rst_dict['cpu_percent'] = result.runtime.cpu_percent
184+
for prop in ('cmdline', 'mem_peak_gb', 'cpu_percent'):
185+
if hasattr(result.runtime, prop):
186+
rst_dict[prop] = getattr(result.runtime, prop)
201187

202188
lines.append(write_rst_dict(rst_dict))
203189

@@ -225,9 +211,17 @@ def write_report(node, report_type=None, is_mapnode=False):
225211
write_rst_dict(result.runtime.environ),
226212
]
227213

228-
with open(report_file, 'at') as fp:
229-
fp.write('\n'.join(lines))
230-
return
214+
report_file.write_text('\n'.join(lines))
215+
216+
217+
def write_report(node, report_type=None, is_mapnode=False):
218+
"""Write a report file for a node - DEPRECATED"""
219+
if report_type not in ('preexec', 'postexec'):
220+
logger.warning('[Node] Unknown report type "%s".', report_type)
221+
return
222+
223+
write_node_report(node, is_mapnode=is_mapnode,
224+
result=node.result if report_type == 'postexec' else None)
231225

232226

233227
def save_resultfile(result, cwd, name, rebase=None):

nipype/utils/filemanip.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(self, path):
5757
from pathlib2 import Path
5858
USING_PATHLIB2 = True
5959

60-
try:
60+
try: # PY35 - strict mode was added in 3.6
6161
Path('/invented/file/path').resolve(strict=True)
6262
except TypeError:
6363
def _patch_resolve(self, strict=False):
@@ -73,6 +73,7 @@ def _patch_resolve(self, strict=False):
7373
except FileNotFoundError:
7474
pass
7575
except OSError:
76+
# PY2
7677
def _patch_resolve(self, strict=False):
7778
"""Raise FileNotFoundError instead of OSError with pathlib2."""
7879
try:
@@ -85,6 +86,27 @@ def _patch_resolve(self, strict=False):
8586
Path.old_resolve = Path.resolve
8687
Path.resolve = _patch_resolve
8788

89+
if not hasattr(Path, 'write_text'):
90+
# PY34 - Path does not have write_text
91+
def _write_text(self, text):
92+
with open(str(self), 'w') as f:
93+
f.write(text)
94+
Path.write_text = _write_text
95+
96+
if PY3:
97+
try: # PY34 - mkdir does not have exist_ok
98+
from tempfile import TemporaryDirectory
99+
with TemporaryDirectory() as tmpdir:
100+
(Path(tmpdir) / 'exist_ok_test').mkdir(exist_ok=True)
101+
except TypeError:
102+
def _mkdir(self, mode=0o777, parents=False, exist_ok=False):
103+
if parents:
104+
os.makedirs(str(self), mode=mode, exist_ok=exist_ok)
105+
elif not exist_ok or not self.exists():
106+
os.mkdir(str(self), mode=mode)
107+
108+
Path.mkdir = _mkdir
109+
88110

89111
def split_filename(fname):
90112
"""Split a filename into parts: path, base filename and extension.

nipype/utils/tests/test_filemanip.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,3 +596,19 @@ def test_pickle(tmp_path, save_versioning):
596596
savepkl(pickle_fname, testobj, versioning=save_versioning)
597597
outobj = loadpkl(pickle_fname)
598598
assert outobj == testobj
599+
600+
601+
def test_Path(tmpdir):
602+
tmp_path = Path(tmpdir.strpath)
603+
604+
(tmp_path / 'textfile').write_text('some text')
605+
606+
with pytest.raises(OSError):
607+
(tmp_path / 'no' / 'parents').mkdir(parents=False)
608+
609+
(tmp_path / 'no' / 'parents').mkdir(parents=True)
610+
611+
with pytest.raises(OSError):
612+
(tmp_path / 'no' / 'parents').mkdir(parents=False)
613+
614+
(tmp_path / 'no' / 'parents').mkdir(parents=True, exist_ok=True)

0 commit comments

Comments
 (0)