Skip to content

Commit c0d148a

Browse files
committed
Removed bucket from being an attribute of the DataSink and just made it a local variable; pickle is not able to pickle the Bucket object. Functionally, the DataSink is the same
1 parent c2eedc7 commit c0d148a

File tree

1 file changed

+33
-40
lines changed

1 file changed

+33
-40
lines changed

nipype/interfaces/io.py

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,7 @@ def _check_s3_base_dir(self):
375375
'''
376376
Method to see if the datasink's base directory specifies an
377377
S3 bucket path; if it does, it parses the path for the bucket
378-
name in the form 's3://bucket_name/...' and adds a bucket
379-
attribute to the data sink instance, i.e. self.bucket
378+
name in the form 's3://bucket_name/...' and returns it
380379
381380
Parameters
382381
----------
@@ -386,15 +385,19 @@ def _check_s3_base_dir(self):
386385
s3_flag : boolean
387386
flag indicating whether the base_directory contained an
388387
S3 bucket path
388+
bucket_name : string
389+
name of the S3 bucket to connect to; if the base directory
390+
is not a valid S3 path, defaults to '<N/A>'
389391
'''
390392

391393
# Init variables
392394
s3_str = 's3://'
395+
bucket_name = '<N/A>'
393396
base_directory = self.inputs.base_directory
394397

395398
if not isdefined(base_directory):
396399
s3_flag = False
397-
return s3_flag
400+
return s3_flag, bucket_name
398401

399402
# Explicitly lower-case the "s3"
400403
if base_directory.lower().startswith(s3_str):
@@ -404,28 +407,15 @@ def _check_s3_base_dir(self):
404407

405408
# Check if 's3://' in base dir
406409
if base_directory.startswith(s3_str):
407-
# Attempt to access bucket
408-
try:
409-
# Expects bucket name to be 's3://bucket_name/base_dir/..'
410-
bucket_name = base_directory.split(s3_str)[1].split('/')[0]
411-
# Get the actual bucket object
412-
if self.inputs.bucket:
413-
self.bucket = self.inputs.bucket
414-
else:
415-
self.bucket = self._fetch_bucket(bucket_name)
416-
# Report error in case of exception
417-
except Exception as exc:
418-
err_msg = 'Unable to access S3 bucket. Error:\n%s. Exiting...'\
419-
% exc
420-
raise Exception(err_msg)
421-
# Bucket access was a success, set flag
410+
# Expects bucket name to be 's3://bucket_name/base_dir/..'
411+
bucket_name = base_directory.split(s3_str)[1].split('/')[0]
422412
s3_flag = True
423413
# Otherwise it's just a normal datasink
424414
else:
425415
s3_flag = False
426416

427417
# Return s3_flag
428-
return s3_flag
418+
return s3_flag, bucket_name
429419

430420
# Function to return AWS secure environment variables
431421
def _return_aws_keys(self):
@@ -576,7 +566,7 @@ def _fetch_bucket(self, bucket_name):
576566
return bucket
577567

578568
# Send up to S3 method
579-
def _upload_to_s3(self, src, dst):
569+
def _upload_to_s3(self, bucket, src, dst):
580570
'''
581571
Method to upload outputs to S3 bucket instead of on local disk
582572
'''
@@ -589,7 +579,6 @@ def _upload_to_s3(self, src, dst):
589579
from botocore.exceptions import ClientError
590580

591581
# Init variables
592-
bucket = self.bucket
593582
iflogger = logging.getLogger('interface')
594583
s3_str = 's3://'
595584
s3_prefix = s3_str + bucket.name
@@ -668,30 +657,34 @@ def _list_outputs(self):
668657
outdir = '.'
669658

670659
# Check if base directory reflects S3 bucket upload
671-
try:
672-
s3_flag = self._check_s3_base_dir()
673-
if s3_flag:
674-
s3dir = self.inputs.base_directory
675-
if isdefined(self.inputs.container):
676-
s3dir = os.path.join(s3dir, self.inputs.container)
660+
s3_flag, bucket_name = self._check_s3_base_dir()
661+
if s3_flag:
662+
s3dir = self.inputs.base_directory
663+
# If user overrides bucket object, use that
664+
if self.inputs.bucket:
665+
bucket = self.inputs.bucket
666+
# Otherwise fetch bucket object using name
677667
else:
678-
s3dir = '<N/A>'
679-
# If encountering an exception during bucket access, set output
680-
# base directory to a local folder
681-
except Exception as exc:
668+
try:
669+
bucket = self._fetch_bucket(bucket_name)
670+
# If encountering an exception during bucket access, set output
671+
# base directory to a local folder
672+
except Exception as exc:
673+
s3dir = '<N/A>'
674+
if not isdefined(self.inputs.local_copy):
675+
local_out_exception = os.path.join(os.path.expanduser('~'),
676+
's3_datasink_' + bucket_name)
677+
outdir = local_out_exception
678+
# Log local copying directory
679+
iflogger.info('Access to S3 failed! Storing outputs locally at: '\
680+
'%s\nError: %s' %(outdir, exc))
681+
else:
682682
s3dir = '<N/A>'
683-
s3_flag = False
684-
if not isdefined(self.inputs.local_copy):
685-
local_out_exception = os.path.join(os.path.expanduser('~'),
686-
's3_datasink_' + self.bucket.name)
687-
outdir = local_out_exception
688-
# Log local copying directory
689-
iflogger.info('Access to S3 failed! Storing outputs locally at: '\
690-
'%s\nError: %s' %(outdir, exc))
691683

692684
# If container input is given, append that to outdir
693685
if isdefined(self.inputs.container):
694686
outdir = os.path.join(outdir, self.inputs.container)
687+
s3dir = os.path.join(s3dir, self.inputs.container)
695688

696689
# If sinking to local folder
697690
if outdir != s3dir:
@@ -743,7 +736,7 @@ def _list_outputs(self):
743736

744737
# If we're uploading to S3
745738
if s3_flag:
746-
self._upload_to_s3(src, s3dst)
739+
self._upload_to_s3(bucket, src, s3dst)
747740
out_files.append(s3dst)
748741
# Otherwise, copy locally src -> dst
749742
if not s3_flag or isdefined(self.inputs.local_copy):

0 commit comments

Comments
 (0)