Skip to content

[ENH] Refactoring of nipype.interfaces.utility #1828

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Upcoming release 0.13
=====================

* ENH: Refactoring of nipype.interfaces.utility (https://github.com/nipy/nipype/pull/1828)
* FIX: CircleCI were failing silently. Some fixes to tests (https://github.com/nipy/nipype/pull/1833)
* FIX: Issues in Docker image permissions, and docker documentation (https://github.com/nipy/nipype/pull/1825)
* ENH: Revised all Dockerfiles and automated deployment to Docker Hub
Expand Down
5 changes: 2 additions & 3 deletions nipype/interfaces/freesurfer/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
# vi: set ft=python sts=4 ts=4 sw=4 et:
from __future__ import print_function, division, unicode_literals, absolute_import
from builtins import open
import os

import os, os.path as op
import pytest
from nipype.testing.fixtures import (create_files_in_directory_plus_dummy_file,
from nipype.testing.fixtures import (create_files_in_directory_plus_dummy_file,
create_surf_file_in_directory)

from nipype.interfaces.base import TraitError
Expand Down
13 changes: 13 additions & 0 deletions nipype/interfaces/utility/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Package contains interfaces for using existing functionality in other packages

Requires Packages to be installed
"""

from .base import (IdentityInterface, Rename, Select, Split, Merge,
AssertEqual)
from .csv import CSVReader
from .wrappers import Function
288 changes: 21 additions & 267 deletions nipype/interfaces/utility.py → nipype/interfaces/utility/base.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""Various utilities
"""
Various utilities

Change directory to provide relative paths for doctests
>>> import os
>>> filepath = os.path.dirname(os.path.realpath(__file__))
>>> datadir = os.path.realpath(os.path.join(filepath,
... '../../testing/data'))
>>> os.chdir(datadir)

Change directory to provide relative paths for doctests
>>> import os
>>> filepath = os.path.dirname( os.path.realpath( __file__ ) )
>>> datadir = os.path.realpath(os.path.join(filepath, '../testing/data'))
>>> os.chdir(datadir)
"""
from __future__ import print_function, division, unicode_literals, absolute_import
from builtins import zip, range, str, open
from builtins import range

from future import standard_library
standard_library.install_aliases()
Expand All @@ -20,22 +23,12 @@
import numpy as np
import nibabel as nb

from nipype import logging
from .base import (traits, TraitedSpec, DynamicTraitedSpec, File,
Undefined, isdefined, OutputMultiPath, runtime_profile,
InputMultiPath, BaseInterface, BaseInterfaceInputSpec)
from .io import IOBase, add_traits
from ..utils.filemanip import (filename_to_list, copyfile, split_filename)
from ..utils.misc import getsource, create_function_from_source

logger = logging.getLogger('interface')
if runtime_profile:
try:
import psutil
except ImportError as exc:
logger.info('Unable to import packages needed for runtime profiling. '\
'Turning off runtime profiler. Reason: %s' % exc)
runtime_profile = False
from ..base import (traits, TraitedSpec, DynamicTraitedSpec, File,
Undefined, isdefined, OutputMultiPath, InputMultiPath,
BaseInterface, BaseInterfaceInputSpec, Str)
from ..io import IOBase, add_traits
from ...utils.filemanip import filename_to_list, copyfile, split_filename


class IdentityInterface(IOBase):
"""Basic interface class generates identity mappings
Expand Down Expand Up @@ -163,11 +156,10 @@ class RenameInputSpec(DynamicTraitedSpec):
in_file = File(exists=True, mandatory=True, desc="file to rename")
keep_ext = traits.Bool(desc=("Keep in_file extension, replace "
"non-extension component of name"))
format_string = traits.String(mandatory=True,
desc=("Python formatting string for output "
"template"))
parse_string = traits.String(desc=("Python regexp parse string to define "
"replacement inputs"))
format_string = Str(mandatory=True,
desc="Python formatting string for output template")
parse_string = Str(desc="Python regexp parse string to define "
"replacement inputs")
use_fullpath = traits.Bool(False, usedefault=True,
desc="Use full path as input to regex parser")

Expand All @@ -191,6 +183,7 @@ class Rename(IOBase):

Examples
--------

>>> from nipype.interfaces.utility import Rename
>>> rename1 = Rename()
>>> rename1.inputs.in_file = "zstat1.nii.gz"
Expand Down Expand Up @@ -357,165 +350,6 @@ def _list_outputs(self):
return outputs


class FunctionInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
function_str = traits.Str(mandatory=True, desc='code for function')


class Function(IOBase):
"""Runs arbitrary function as an interface

Examples
--------

>>> func = 'def func(arg1, arg2=5): return arg1 + arg2'
>>> fi = Function(input_names=['arg1', 'arg2'], output_names=['out'])
>>> fi.inputs.function_str = func
>>> res = fi.run(arg1=1)
>>> res.outputs.out
6

"""

input_spec = FunctionInputSpec
output_spec = DynamicTraitedSpec

def __init__(self, input_names, output_names, function=None, imports=None,
**inputs):
"""

Parameters
----------

input_names: single str or list
names corresponding to function inputs
output_names: single str or list
names corresponding to function outputs.
has to match the number of outputs
function : callable
callable python object. must be able to execute in an
isolated namespace (possibly in concert with the ``imports``
parameter)
imports : list of strings
list of import statements that allow the function to execute
in an otherwise empty namespace
"""

super(Function, self).__init__(**inputs)
if function:
if hasattr(function, '__call__'):
try:
self.inputs.function_str = getsource(function)
except IOError:
raise Exception('Interface Function does not accept '
'function objects defined interactively '
'in a python session')
elif isinstance(function, (str, bytes)):
self.inputs.function_str = function
else:
raise Exception('Unknown type of function')
self.inputs.on_trait_change(self._set_function_string,
'function_str')
self._input_names = filename_to_list(input_names)
self._output_names = filename_to_list(output_names)
add_traits(self.inputs, [name for name in self._input_names])
self.imports = imports
self._out = {}
for name in self._output_names:
self._out[name] = None

def _set_function_string(self, obj, name, old, new):
if name == 'function_str':
if hasattr(new, '__call__'):
function_source = getsource(new)
elif isinstance(new, (str, bytes)):
function_source = new
self.inputs.trait_set(trait_change_notify=False,
**{'%s' % name: function_source})

def _add_output_traits(self, base):
undefined_traits = {}
for key in self._output_names:
base.add_trait(key, traits.Any)
undefined_traits[key] = Undefined
base.trait_set(trait_change_notify=False, **undefined_traits)
return base

def _run_interface(self, runtime):
# Get workflow logger for runtime profile error reporting
from nipype import logging
logger = logging.getLogger('workflow')

# Create function handle
function_handle = create_function_from_source(self.inputs.function_str,
self.imports)

# Wrapper for running function handle in multiprocessing.Process
# Can catch exceptions and report output via multiprocessing.Queue
def _function_handle_wrapper(queue, **kwargs):
try:
out = function_handle(**kwargs)
queue.put(out)
except Exception as exc:
queue.put(exc)

# Get function args
args = {}
for name in self._input_names:
value = getattr(self.inputs, name)
if isdefined(value):
args[name] = value

# Profile resources if set
if runtime_profile:
from nipype.interfaces.base import get_max_resources_used
import multiprocessing
# Init communication queue and proc objs
queue = multiprocessing.Queue()
proc = multiprocessing.Process(target=_function_handle_wrapper,
args=(queue,), kwargs=args)

# Init memory and threads before profiling
mem_mb = 0
num_threads = 0

# Start process and profile while it's alive
proc.start()
while proc.is_alive():
mem_mb, num_threads = \
get_max_resources_used(proc.pid, mem_mb, num_threads,
pyfunc=True)

# Get result from process queue
out = queue.get()
# If it is an exception, raise it
if isinstance(out, Exception):
raise out

# Function ran successfully, populate runtime stats
setattr(runtime, 'runtime_memory_gb', mem_mb / 1024.0)
setattr(runtime, 'runtime_threads', num_threads)
else:
out = function_handle(**args)

if len(self._output_names) == 1:
self._out[self._output_names[0]] = out
else:
if isinstance(out, tuple) and (len(out) != len(self._output_names)):
raise RuntimeError('Mismatch in number of expected outputs')

else:
for idx, name in enumerate(self._output_names):
self._out[name] = out[idx]

return runtime

def _list_outputs(self):
outputs = self._outputs().get()
for key in self._output_names:
outputs[key] = self._out[key]
return outputs


class AssertEqualInputSpec(BaseInterfaceInputSpec):
volume1 = File(exists=True, mandatory=True)
volume2 = File(exists=True, mandatory=True)
Expand All @@ -532,83 +366,3 @@ def _run_interface(self, runtime):
if not np.all(data1 == data2):
raise RuntimeError('Input images are not exactly equal')
return runtime


class CSVReaderInputSpec(DynamicTraitedSpec, TraitedSpec):
in_file = File(exists=True, mandatory=True, desc='Input comma-seperated value (CSV) file')
header = traits.Bool(False, usedefault=True, desc='True if the first line is a column header')


class CSVReader(BaseInterface):
"""
Examples
--------

>>> reader = CSVReader() # doctest: +SKIP
>>> reader.inputs.in_file = 'noHeader.csv' # doctest: +SKIP
>>> out = reader.run() # doctest: +SKIP
>>> out.outputs.column_0 == ['foo', 'bar', 'baz'] # doctest: +SKIP
True
>>> out.outputs.column_1 == ['hello', 'world', 'goodbye'] # doctest: +SKIP
True
>>> out.outputs.column_2 == ['300.1', '5', '0.3'] # doctest: +SKIP
True

>>> reader = CSVReader() # doctest: +SKIP
>>> reader.inputs.in_file = 'header.csv' # doctest: +SKIP
>>> reader.inputs.header = True # doctest: +SKIP
>>> out = reader.run() # doctest: +SKIP
>>> out.outputs.files == ['foo', 'bar', 'baz'] # doctest: +SKIP
True
>>> out.outputs.labels == ['hello', 'world', 'goodbye'] # doctest: +SKIP
True
>>> out.outputs.erosion == ['300.1', '5', '0.3'] # doctest: +SKIP
True

"""
input_spec = CSVReaderInputSpec
output_spec = DynamicTraitedSpec
_always_run = True

def _append_entry(self, outputs, entry):
for key, value in zip(self._outfields, entry):
outputs[key].append(value)
return outputs

def _parse_line(self, line):
line = line.replace('\n', '')
entry = [x.strip() for x in line.split(',')]
return entry

def _get_outfields(self):
with open(self.inputs.in_file, 'r') as fid:
entry = self._parse_line(fid.readline())
if self.inputs.header:
self._outfields = tuple(entry)
else:
self._outfields = tuple(['column_' + str(x) for x in range(len(entry))])
return self._outfields

def _run_interface(self, runtime):
self._get_outfields()
return runtime

def _outputs(self):
return self._add_output_traits(super(CSVReader, self)._outputs())

def _add_output_traits(self, base):
return add_traits(base, self._get_outfields())

def _list_outputs(self):
outputs = self.output_spec().get()
isHeader = True
for key in self._outfields:
outputs[key] = [] # initialize outfields
with open(self.inputs.in_file, 'r') as fid:
for line in fid.readlines():
if self.inputs.header and isHeader: # skip header line
isHeader = False
continue
entry = self._parse_line(line)
outputs = self._append_entry(outputs, entry)
return outputs
Loading