Skip to content
Merged

Scp #2660

Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ config.py
*~

# don't show the data on git status
qiita_db/support_files/test_data
qiita_db/support_files/test_data/

# ignoring redis files
dump.rdb
Expand All @@ -60,3 +60,6 @@ qiita_pet/static/doc/

# webdis log
webdis.log

# test keys should be generated in travis
qiita_ware/test/test_data/test_key
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ install:
- export REDBIOM_HOST=http://127.0.0.1:7379
- cp $PWD/qiita_core/support_files/BIOM\ type_2.1.4.conf ~/.qiita_plugins/BIOM\ type_2.1.4\ -\ Qiime2.conf
- touch ~/.bash_profile
- ssh-keygen -t rsa -C "ssh test key" -f $PWD/qiita_ware/test/test_data/test_key -P ""
- mv $PWD/qiita_ware/test/test_data/test_key.pub ~/.ssh/
- ssh-keyscan localhost >> ~/.ssh/known_hosts
- cat ~/.ssh/test_key.pub >> ~/.ssh/authorized_keys
- scp -i $PWD/qiita_ware/test/test_data/test_key localhost:$PWD/qiita_ware/test/test_data/random_key $PWD/qiita_ware/test/test_data/random_key_copy
# Install the biom plugin so we can run the analysis tests
- wget https://data.qiime2.org/distro/core/qiime2-2017.12-py35-linux-conda.yml
- travis_retry conda env create -q -n qtp-biom --file qiime2-2017.12-py35-linux-conda.yml
Expand Down
5 changes: 4 additions & 1 deletion qiita_db/support_files/patches/66.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
-- August 6, 2018

SELECT 42;

-- August 22, 2018
-- add specimen_id_column to study table (needed to plate samples in labman)

ALTER TABLE qiita.study ADD specimen_id_column varchar(256);

COMMENT ON COLUMN qiita.study.specimen_id_column IS 'The name of the column that describes the specimen identifiers (such as what is written on the tubes).';

166 changes: 166 additions & 0 deletions qiita_db/support_files/patches/python_patches/66.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# August 6, 2018
# Create parameters for the ssh/scp remote file upload commands


from json import loads, dumps

from qiita_db.sql_connection import TRN
from qiita_db.software import Software, Command
from qiita_db.exceptions import (QiitaDBError, QiitaDBDuplicateError)
from qiita_db.util import convert_to_id


# Copied from patch 58.py. Couldn't import due to how patching system works
def create_command(software, name, description, parameters, outputs=None,
analysis_only=False):
r"""Replicates the Command.create code at the time the patch was written"""
# Perform some sanity checks in the parameters dictionary
if not parameters:
raise QiitaDBError(
"Error creating command %s. At least one parameter should "
"be provided." % name)
sql_param_values = []
sql_artifact_params = []
for pname, vals in parameters.items():
if len(vals) != 2:
raise QiitaDBError(
"Malformed parameters dictionary, the format should be "
"{param_name: [parameter_type, default]}. Found: "
"%s for parameter name %s" % (vals, pname))

ptype, dflt = vals
# Check that the type is one of the supported types
supported_types = ['string', 'integer', 'float', 'reference',
'boolean', 'prep_template', 'analysis']
if ptype not in supported_types and not ptype.startswith(
('choice', 'mchoice', 'artifact')):
supported_types.extend(['choice', 'mchoice', 'artifact'])
raise QiitaDBError(
"Unsupported parameters type '%s' for parameter %s. "
"Supported types are: %s"
% (ptype, pname, ', '.join(supported_types)))

if ptype.startswith(('choice', 'mchoice')) and dflt is not None:
choices = set(loads(ptype.split(':')[1]))
dflt_val = dflt
if ptype.startswith('choice'):
# In the choice case, the dflt value is a single string,
# create a list with it the string on it to use the
# issuperset call below
dflt_val = [dflt_val]
else:
# jsonize the list to store it in the DB
dflt = dumps(dflt)
if not choices.issuperset(dflt_val):
raise QiitaDBError(
"The default value '%s' for the parameter %s is not "
"listed in the available choices: %s"
% (dflt, pname, ', '.join(choices)))

if ptype.startswith('artifact'):
atypes = loads(ptype.split(':')[1])
sql_artifact_params.append(
[pname, 'artifact', atypes])
else:
if dflt is not None:
sql_param_values.append([pname, ptype, False, dflt])
else:
sql_param_values.append([pname, ptype, True, None])

with TRN:
sql = """SELECT EXISTS(SELECT *
FROM qiita.software_command
WHERE software_id = %s AND name = %s)"""
TRN.add(sql, [software.id, name])
if TRN.execute_fetchlast():
raise QiitaDBDuplicateError(
"command", "software: %d, name: %s"
% (software.id, name))
# Add the command to the DB
sql = """INSERT INTO qiita.software_command
(name, software_id, description, is_analysis)
VALUES (%s, %s, %s, %s)
RETURNING command_id"""
sql_params = [name, software.id, description, analysis_only]
TRN.add(sql, sql_params)
c_id = TRN.execute_fetchlast()

# Add the parameters to the DB
sql = """INSERT INTO qiita.command_parameter
(command_id, parameter_name, parameter_type, required,
default_value)
VALUES (%s, %s, %s, %s, %s)
RETURNING command_parameter_id"""
sql_params = [[c_id, pname, p_type, reqd, default]
for pname, p_type, reqd, default in sql_param_values]
TRN.add(sql, sql_params, many=True)
TRN.execute()

# Add the artifact parameters
sql_type = """INSERT INTO qiita.parameter_artifact_type
(command_parameter_id, artifact_type_id)
VALUES (%s, %s)"""
supported_types = []
for pname, p_type, atypes in sql_artifact_params:
sql_params = [c_id, pname, p_type, True, None]
TRN.add(sql, sql_params)
pid = TRN.execute_fetchlast()
sql_params = [[pid, convert_to_id(at, 'artifact_type')]
for at in atypes]
TRN.add(sql_type, sql_params, many=True)
supported_types.extend([atid for _, atid in sql_params])

# If the software type is 'artifact definition', there are a couple
# of extra steps
if software.type == 'artifact definition':
# If supported types is not empty, link the software with these
# types
if supported_types:
sql = """INSERT INTO qiita.software_artifact_type
(software_id, artifact_type_id)
VALUES (%s, %s)"""
sql_params = [[software.id, atid]
for atid in supported_types]
TRN.add(sql, sql_params, many=True)
# If this is the validate command, we need to add the
# provenance and name parameters. These are used internally,
# that's why we are adding them here
if name == 'Validate':
sql = """INSERT INTO qiita.command_parameter
(command_id, parameter_name, parameter_type,
required, default_value)
VALUES (%s, 'name', 'string', 'False',
'dflt_name'),
(%s, 'provenance', 'string', 'False', NULL)
"""
TRN.add(sql, [c_id, c_id])

# Add the outputs to the command
if outputs:
sql = """INSERT INTO qiita.command_output
(name, command_id, artifact_type_id)
VALUES (%s, %s, %s)"""
sql_args = [[pname, c_id, convert_to_id(at, 'artifact_type')]
for pname, at in outputs.items()]
TRN.add(sql, sql_args, many=True)
TRN.execute()

return Command(c_id)


with TRN:
qiita_plugin = Software.from_name_and_version('Qiita', 'alpha')

# Create the 'list_remote_files' command
parameters = {'url': ['string', None],
'private_key': ['string', None]}
create_command(qiita_plugin, "list_remote_files",
"retrieves list of valid study files from remote dir",
parameters)

# Create the 'download_remote_files' command
parameters = {'url': ['string', None],
'destination': ['string', None],
'private_key': ['string', None]}
create_command(qiita_plugin, "download_remote_files",
"downloads valid study files from remote dir", parameters)
144 changes: 143 additions & 1 deletion qiita_ware/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from os.path import join, isdir
from os.path import basename, isdir, join
from shutil import rmtree
from tarfile import open as taropen
from tempfile import mkdtemp
from os import environ
from traceback import format_exc
from paramiko import AutoAddPolicy, RSAKey, SSHClient
from scp import SCPClient
from urlparse import urlparse
from functools import partial

from qiita_db.artifact import Artifact
from qiita_db.logger import LogEntry
Expand All @@ -21,6 +25,144 @@
from qiita_ware.exceptions import ComputeError, EBISubmissionError


def _ssh_session(p_url, private_key):
"""Initializes an SSH session
Parameters
----------
p_url : urlparse object
a parsed url
private_key : str
Path to the private key used to authenticate connection
Returns
-------
paramiko.SSHClient
the SSH session
"""
scheme = p_url.scheme
hostname = p_url.hostname
# if port is '' Python 2.7.6 will raise an error
try:
port = p_url.port
except Exception:
port = 22
username = p_url.username

if scheme == 'scp' or scheme == 'sftp':

# if port not specified, use default 22 as port
if port is None:
port = 22

# step 1: both schemes require an SSH connection
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy)

# step 2: connect to fileserver
key = RSAKey.from_private_key_file(private_key)
ssh.connect(hostname, port=port, username=username,
pkey=key, look_for_keys=False)
return ssh
else:
raise ValueError(
'Not valid scheme. Valid options are ssh and scp.')


def _list_valid_files(ssh, directory):
"""Gets a list of valid study files from ssh session
Parameters
----------
ssh : paramiko.SSHClient
An initializeed ssh session
directory : str
the directory to search for files
Returns
-------
list of str
list of valid study files (basenames)
"""

valid_file_extensions = tuple(qiita_config.valid_upload_extension)
sftp = ssh.open_sftp()
files = sftp.listdir(directory)

valid_files = [f for f in files if f.endswith(valid_file_extensions)]
sftp.close()
return valid_files


def list_remote(URL, private_key):
"""Retrieve valid study files from a remote directory
Parameters
----------
URL : str
The url to the remote directory
private_key : str
Path to the private key used to authenticate connection
Returns
-------
list of str
list of files that are valid study files
Notes
-----
Only the allowed extensions described by the config file
will be listed.
"""
p_url = urlparse(URL)
directory = p_url.path
ssh = _ssh_session(p_url, private_key)
valid_files = _list_valid_files(ssh, directory)
ssh.close()
return valid_files


def download_remote(URL, private_key, destination):
"""Add study files by specifying a remote directory to download from
Parameters
----------
URL : str
The url to the remote directory
private_key : str
Path to the private key used to authenticate connection
destination : str
The path to the study upload folder
"""

# step 1: initialize connection and list valid files
p_url = urlparse(URL)
ssh = _ssh_session(p_url, private_key)

directory = p_url.path
valid_files = _list_valid_files(ssh, directory)
file_paths = [join(directory, f) for f in valid_files]

# step 2: download files
scheme = p_url.scheme
# note that scp/sftp's code seems similar but the local_path/localpath
# variable is different within the for loop
if scheme == 'scp':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my previous review

I think this can be broken down to be a bit simpler, for example:

     if scheme == 'scp':
        client = SCPClient(ssh.get_transport())
     elif scheme == 'sftp':
        client = ssh.open_sftp()

     for f in file_paths:
         download = partial(client.get,
                            local_path=join(destination, basename(f)))
         download(f)

Also it should throw an exception if the scheme is not supported.

Copy link
Member Author

@antgonza antgonza Sep 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, forgot to mention this.

  1. The split is not really possible because, as the note above says, scp and sftp use a different variable local_path vs localpath; and couldn't figure out how to make it play nice with your above coment - note that most of the pushes were due errors on these lines.
  2. The exception is gonna be thrown a few lines above, when: ssh = _ssh_session(p_url, private_key)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for pointing that out. Subtle differences in the api 😑

scp = SCPClient(ssh.get_transport())
for f in file_paths:
download = partial(
scp.get, local_path=join(destination, basename(f)))
download(f)
elif scheme == 'sftp':
sftp = ssh.open_sftp()
for f in file_paths:
download = partial(
sftp.get, localpath=join(destination, basename(f)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for loop is missing a download call right? Since the tests are passing I assume this is not being tested?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch! There are tests but turns out that I remove the wrong line when simplifying the code ... fixing ...


# step 3: close the connection
ssh.close()


def submit_EBI(artifact_id, action, send, test=False):
"""Submit an artifact to EBI
Expand Down
Loading