-
Notifications
You must be signed in to change notification settings - Fork 3
/
app.py
184 lines (145 loc) · 6.5 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from bioblend.galaxy import GalaxyInstance
import json
import weshandler
import time
import os
import sys
import globus_sdk
app = Flask(__name__)
app.config.from_pyfile('config_file.cfg')
master_key = app.config['GALAXY_MASTER_KEY']
url = app.config['URL']
cwl_runner_galaxy_workflow_minid = "ark:/57799/b93q6h"
QUERY_BASE = "http://minid.bd2k.org/minid/landingpage/"
#db = SQLAlchemy(app)
@app.route('/wes/service-info', methods=['GET'])
def get_service_info():
return json.dumps(weshandler.__service_info())
# This is the resource to return all the workflows available on the service
@app.route('/wes/workflows', methods=['GET'])
def get_workflows():
if(request.headers['Authorization']):
api_key = __get_galaxy_user(request.headers['Authorization'])
else:
print ("Inside Else")
return Response('Globus Auth token required..', 401, {'WWW-Authenticate': 'Basic realm="Tokens Required"'})
gi = GalaxyInstance(url=url, key=api_key)
return json.dumps(weshandler.__get_workflows(gi))
# This is the resource to submit a workflow
# Takes a JSON Paylod with parameters and return the ID for the run.
@app.route('/wes/workflows', methods=['POST'])
def submit_workflow():
parameters = str(request.get_json(silent=False))
parameters = parameters.replace('\'', '"')
if(request.headers['Authorization']):
api_key = __get_galaxy_user(request.headers['Authorization'])
else:
print ("Inside Else")
return Response('Globus Auth token required..', 401, {'WWW-Authenticate': 'Basic realm="Tokens Required"'})
gi = GalaxyInstance(url=url, key=api_key)
wf = __import_galaxy_cwl_workflow(minid=cwl_runner_galaxy_workflow_minid, gi=gi)
return json.dumps(weshandler.__submit_workflow(json_param=parameters, gi_handle=gi, workflow=wf))
## This resource provides detailed info on a workflow run
@app.route('/wes/workflows/<workflow_id>', methods=['GET'])
def get_workflow_run_details(workflow_id=None):
if(request.headers['Authorization']):
api_key = __get_galaxy_user(request.headers['Authorization'])
else:
print ("Inside Else")
return Response('Globus Auth token required..', 401, {'WWW-Authenticate': 'Basic realm="Tokens Required"'})
gi = GalaxyInstance(url=url, key=api_key)
return json.dumps(weshandler.__get_workflow_details(gi, workflow_id))
## This resource provides detailed info on a workflow run
@app.route('/wes/workflows/<workflow_id>', methods=['DELETE'])
def delete_workflow(workflow_id=None):
if(request.headers['Authorization']):
api_key = __get_galaxy_user(request.headers['Authorization'])
else:
print ("Inside Else")
return Response('Globus Auth token required..', 401, {'WWW-Authenticate': 'Basic realm="Tokens Required"'})
gi = GalaxyInstance(url=url, key=api_key)
return json.dumps(weshandler.__delete_workflow(gi, workflow_id))
## This resource provides status of a workflow run
@app.route('/wes/workflows/<workflow_id>/status')
def workflow_status(workflow_id=None):
if(request.headers['Authorization']):
api_key = __get_galaxy_user(request.headers['Authorization'])
else:
print ("Inside Else")
return Response('Globus Auth token required..', 401, {'WWW-Authenticate': 'Basic realm="Tokens Required"'})
gi = GalaxyInstance(url=url, key=api_key)
return json.dumps(weshandler.__get_workflow_status(gi, workflow_id))
def __get_galaxy_user(auth):
globus_user = __get_globus_user(auth)
gi = GalaxyInstance(url=url, key=master_key)
# Map the globus username to Galaxy username
user_list = gi.users.get_users( f_name=globus_user )
galaxy_user = None
for user in user_list:
if globus_user == user['username']:
galaxy_user = user
break
if galaxy_user is None:
# __create_galaxy_user(globus_user)
galaxy_user = __create_galaxy_user(globus_user, gi)
# __create_API_key(galaxy_user)
api_key = gi.users.get_user_apikey(galaxy_user['id'])
if api_key == "Not available.":
api_key = gi.users.create_user_apikey(galaxy_user['id'])
return api_key
def __get_globus_user(token):
# Using Auth token, get the globus username and return the username
# start globus client
client_id = app.config['GLOBUS_CLIENT_ID']
client_secret = app.config['GLOBUS_CLIENT_SECRET']
redirect_uri = app.config['GLOBUS_REDIRECT_URI']
client = globus_sdk.ConfidentialAppAuthClient(client_id, client_secret)
client.oauth2_start_flow(redirect_uri)
# check whether token is active
check_result = client.oauth2_validate_token(token)
if not check_result['active']:
# ???? how how handle this
sys.exit('Token not active.')
# get username
token_info = client.oauth2_token_introspect(token)
username = token_info['username']
if '@' in username:
username = username[0:username.find('@')]
# get and record auth token and transfer token
dependent_token_info = client.oauth2_get_dependent_tokens(token)
transfer_data = dependent_token_info.by_resource_server['transfer.api.globus.org']
transfer_token = transfer_data['access_token']
auth_data = dependent_token_info.by_resource_server['auth.globus.org']
auth_token = auth_data['access_token']
identifiers_data = dependent_token_info.by_resource_server['identifiers.globus.org']
identifiers_token = identifiers_data['access_token']
def record_token(dir_type, token):
dir_name = os.path.join(app.config['GLOBUS_TOKEN_FILES_DIR'], dir_type)
if not os.path.isdir(dir_name):
os.mkdir(dir_name, mode=0o700)
record_file = os.path.join(dir_name, username)
with open(record_file, 'w') as write_file:
write_file.write(token)
record_token('tokens', transfer_token)
record_token('tokens-auth', auth_token)
record_token('tokens-identifiers', identifiers_token)
return username
def __create_galaxy_user(globus_user, gi):
user_email = "%s@globusid.org" % globus_user
user = gi.users.create_remote_user(user_email)
return user
def __import_galaxy_cwl_workflow(minid=None, gi=None):
wf_mine = None
# A.
ga_file = app.config["CWL_RUNNER_WORKFLOW_GA"]
# B.
ga_dict = None
with open(ga_file) as handle:
ga_dict = json.loads(handle.read())
if ga_dict is not None:
wf_mine = gi.workflows.import_workflow_dict(ga_dict)
return wf_mine
if __name__ == '__main__':
app.run(threaded=True)