Skip to content

ENH: Avoid loading result from file when writing reports #3024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from .utils import (
_parameterization_dir, save_hashfile as _save_hashfile, load_resultfile as
_load_resultfile, save_resultfile as _save_resultfile, nodelist_runner as
_node_runner, strip_temp as _strip_temp, write_report,
_node_runner, strip_temp as _strip_temp, write_node_report,
clean_working_directory, merge_dict, evaluate_connect_function)
from .base import EngineBase

Expand Down Expand Up @@ -464,8 +464,7 @@ def run(self, updatehash=False):

# Store runtime-hashfile, pre-execution report, the node and the inputs set.
_save_hashfile(hashfile_unfinished, self._hashed_inputs)
write_report(
self, report_type='preexec', is_mapnode=isinstance(self, MapNode))
write_node_report(self, is_mapnode=isinstance(self, MapNode))
savepkl(op.join(outdir, '_node.pklz'), self)
savepkl(op.join(outdir, '_inputs.pklz'), self.inputs.get_traitsfree())

Expand All @@ -484,8 +483,7 @@ def run(self, updatehash=False):
# Tear-up after success
shutil.move(hashfile_unfinished,
hashfile_unfinished.replace('_unfinished', ''))
write_report(
self, report_type='postexec', is_mapnode=isinstance(self, MapNode))
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
logger.info('[Node] Finished "%s".', self.fullname)
return result

Expand Down Expand Up @@ -1204,7 +1202,7 @@ def get_subnodes(self):
"""Generate subnodes of a mapnode and write pre-execution report"""
self._get_inputs()
self._check_iterfield()
write_report(self, report_type='preexec', is_mapnode=True)
write_node_report(self, result=None, is_mapnode=True)
return [node for _, node in self._make_nodes()]

def num_subnodes(self):
Expand Down
90 changes: 42 additions & 48 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,68 +116,60 @@ def nodelist_runner(nodes, updatehash=False, stop_first=False):
yield i, result, err


def write_report(node, report_type=None, is_mapnode=False):
"""Write a report file for a node"""
def write_node_report(node, result=None, is_mapnode=False):
"""Write a report file for a node."""
if not str2bool(node.config['execution']['create_report']):
return

if report_type not in ['preexec', 'postexec']:
logger.warning('[Node] Unknown report type "%s".', report_type)
return

cwd = node.output_dir()
report_dir = os.path.join(cwd, '_report')
report_file = os.path.join(report_dir, 'report.rst')
makedirs(report_dir, exist_ok=True)

logger.debug('[Node] Writing %s-exec report to "%s"', report_type[:-4],
report_file)
if report_type.startswith('pre'):
lines = [
write_rst_header('Node: %s' % get_print_name(node), level=0),
write_rst_list(
['Hierarchy : %s' % node.fullname,
'Exec ID : %s' % node._id]),
write_rst_header('Original Inputs', level=1),
write_rst_dict(node.inputs.trait_get()),
]
with open(report_file, 'wt') as fp:
fp.write('\n'.join(lines))
return
report_file = Path(cwd) / '_report' / 'report.rst'
report_file.parent.mkdir(exist_ok=True, parents=True)

lines = [
write_rst_header('Node: %s' % get_print_name(node), level=0),
write_rst_list(
['Hierarchy : %s' % node.fullname,
'Exec ID : %s' % node._id]),
write_rst_header('Original Inputs', level=1),
write_rst_dict(node.inputs.trait_get()),
]

if result is None:
logger.debug('[Node] Writing pre-exec report to "%s"', report_file)
report_file.write_text('\n'.join(lines))
return

logger.debug('[Node] Writing post-exec report to "%s"', report_file)
lines += [
write_rst_header('Execution Inputs', level=1),
write_rst_dict(node.inputs.trait_get()),
write_rst_header('Execution Outputs', level=1)
]

result = node.result # Locally cache result
outputs = result.outputs

if outputs is None:
with open(report_file, 'at') as fp:
fp.write('\n'.join(lines))
lines += ['None']
report_file.write_text('\n'.join(lines))
return

lines.append(write_rst_header('Execution Outputs', level=1))

if isinstance(outputs, Bunch):
lines.append(write_rst_dict(outputs.dictcopy()))
elif outputs:
lines.append(write_rst_dict(outputs.trait_get()))
else:
lines += ['Outputs object was empty.']

if is_mapnode:
lines.append(write_rst_header('Subnode reports', level=1))
nitems = len(ensure_list(getattr(node.inputs, node.iterfield[0])))
subnode_report_files = []
for i in range(nitems):
nodecwd = os.path.join(cwd, 'mapflow', '_%s%d' % (node.name, i),
'_report', 'report.rst')
subnode_report_files.append('subnode %d : %s' % (i, nodecwd))
subnode_file = Path(cwd) / 'mapflow' / (
'_%s%d' % (node.name, i)) / '_report' / 'report.rst'
subnode_report_files.append('subnode %d : %s' % (i, subnode_file))

lines.append(write_rst_list(subnode_report_files))

with open(report_file, 'at') as fp:
fp.write('\n'.join(lines))
report_file.write_text('\n'.join(lines))
return

lines.append(write_rst_header('Runtime info', level=1))
Expand All @@ -189,15 +181,9 @@ def write_report(node, report_type=None, is_mapnode=False):
'prev_wd': getattr(result.runtime, 'prevcwd', '<not-set>'),
}

if hasattr(result.runtime, 'cmdline'):
rst_dict['command'] = result.runtime.cmdline

# Try and insert memory/threads usage if available
if hasattr(result.runtime, 'mem_peak_gb'):
rst_dict['mem_peak_gb'] = result.runtime.mem_peak_gb

if hasattr(result.runtime, 'cpu_percent'):
rst_dict['cpu_percent'] = result.runtime.cpu_percent
for prop in ('cmdline', 'mem_peak_gb', 'cpu_percent'):
if hasattr(result.runtime, prop):
rst_dict[prop] = getattr(result.runtime, prop)

lines.append(write_rst_dict(rst_dict))

Expand Down Expand Up @@ -225,9 +211,17 @@ def write_report(node, report_type=None, is_mapnode=False):
write_rst_dict(result.runtime.environ),
]

with open(report_file, 'at') as fp:
fp.write('\n'.join(lines))
return
report_file.write_text('\n'.join(lines))


def write_report(node, report_type=None, is_mapnode=False):
"""Write a report file for a node - DEPRECATED"""
if report_type not in ('preexec', 'postexec'):
logger.warning('[Node] Unknown report type "%s".', report_type)
return

write_node_report(node, is_mapnode=is_mapnode,
result=node.result if report_type == 'postexec' else None)


def save_resultfile(result, cwd, name, rebase=None):
Expand Down
24 changes: 23 additions & 1 deletion nipype/utils/filemanip.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, path):
from pathlib2 import Path
USING_PATHLIB2 = True

try:
try: # PY35 - strict mode was added in 3.6
Path('/invented/file/path').resolve(strict=True)
except TypeError:
def _patch_resolve(self, strict=False):
Expand All @@ -73,6 +73,7 @@ def _patch_resolve(self, strict=False):
except FileNotFoundError:
pass
except OSError:
# PY2
def _patch_resolve(self, strict=False):
"""Raise FileNotFoundError instead of OSError with pathlib2."""
try:
Expand All @@ -85,6 +86,27 @@ def _patch_resolve(self, strict=False):
Path.old_resolve = Path.resolve
Path.resolve = _patch_resolve

if not hasattr(Path, 'write_text'):
# PY34 - Path does not have write_text
def _write_text(self, text):
with open(str(self), 'w') as f:
f.write(text)
Path.write_text = _write_text

if PY3:
try: # PY34 - mkdir does not have exist_ok
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tmpdir:
(Path(tmpdir) / 'exist_ok_test').mkdir(exist_ok=True)
except TypeError:
def _mkdir(self, mode=0o777, parents=False, exist_ok=False):
if parents:
os.makedirs(str(self), mode=mode, exist_ok=exist_ok)
elif not exist_ok or not self.exists():
os.mkdir(str(self), mode=mode)

Path.mkdir = _mkdir


def split_filename(fname):
"""Split a filename into parts: path, base filename and extension.
Expand Down
16 changes: 16 additions & 0 deletions nipype/utils/tests/test_filemanip.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,3 +596,19 @@ def test_pickle(tmp_path, save_versioning):
savepkl(pickle_fname, testobj, versioning=save_versioning)
outobj = loadpkl(pickle_fname)
assert outobj == testobj


def test_Path(tmpdir):
tmp_path = Path(tmpdir.strpath)

(tmp_path / 'textfile').write_text('some text')

with pytest.raises(OSError):
(tmp_path / 'no' / 'parents').mkdir(parents=False)

(tmp_path / 'no' / 'parents').mkdir(parents=True)

with pytest.raises(OSError):
(tmp_path / 'no' / 'parents').mkdir(parents=False)

(tmp_path / 'no' / 'parents').mkdir(parents=True, exist_ok=True)