8
8
from bson .objectid import ObjectId
9
9
from pymonad .promise import Promise , _Promise
10
10
11
- from tesp_api .utils .docker import (
12
- docker_run_command ,
13
- docker_stage_in_command ,
14
- docker_stage_out_command ,
15
- map_volumes
16
- )
17
- from tesp_api .utils .singularity import (
18
- singularity_run_command ,
19
- singularity_stage_in_command ,
20
- singularity_stage_out_command
21
- )
11
+ from tesp_api .utils .container import stage_in_command , run_command , stage_out_command , map_volumes
22
12
from tesp_api .service .pulsar_service import pulsar_service
23
13
from tesp_api .service .event_dispatcher import dispatch_event
24
14
from tesp_api .utils .functional import get_else_throw , maybe_of
32
22
TesTaskExecutor ,
33
23
TesTaskResources ,
34
24
TesTaskInput ,
35
- TesTaskOutput
25
+ TesTaskOutput ,
26
+ TesTaskIOType
36
27
)
37
28
from tesp_api .repository .task_repository_utils import append_task_executor_logs , update_last_task_log_time
38
29
39
- CONTAINER_TYPE = os .getenv ("CONTAINER_TYPE" , "both " )
30
+ CONTAINER_TYPE = os .getenv ("CONTAINER_TYPE" , "docker " )
40
31
41
32
@local_handler .register (event_name = "queued_task" )
42
33
def handle_queued_task (event : Event ) -> None :
@@ -78,6 +69,7 @@ async def handle_initializing_task(event: Event) -> None:
78
69
task_id : ObjectId = payload ['task_id' ]
79
70
pulsar_operations : PulsarRestOperations = payload ['pulsar_operations' ]
80
71
72
+ # Merged Logic: Using the feature-complete setup_data from the new version
81
73
async def setup_data (job_id : ObjectId ,
82
74
resources : TesTaskResources ,
83
75
volumes : List [str ],
@@ -95,15 +87,24 @@ async def setup_data(job_id: ObjectId,
95
87
96
88
output_confs , volume_confs = map_volumes (str (job_id ), volumes , outputs )
97
89
98
- for i , tes_input in enumerate (inputs ):
99
- content = tes_input . content
100
- pulsar_path_val = payload ['task_config' ]['inputs_directory' ] + f'/input_file_ { i } '
101
- if content is not None and tes_input .url is None :
102
- pulsar_path_val = await pulsar_operations .upload (
90
+ for i , input_item in enumerate (inputs ):
91
+ if input_item . type == TesTaskIOType . DIRECTORY :
92
+ pulsar_path = payload ['task_config' ]['inputs_directory' ] + f'/input_dir_ { i } '
93
+ elif input_item . content is not None and input_item .url is None :
94
+ pulsar_path = await pulsar_operations .upload (
103
95
job_id , DataType .INPUT ,
104
- file_content = Just (content ),
105
- file_path = f'input_file_{ i } ' )
106
- input_confs .append ({'container_path' : tes_input .path , 'pulsar_path' : pulsar_path_val , 'url' : tes_input .url })
96
+ file_content = Just (input_item .content ),
97
+ file_path = f'input_file_{ i } '
98
+ )
99
+ else :
100
+ pulsar_path = payload ['task_config' ]['inputs_directory' ] + f'/input_file_{ i } '
101
+
102
+ input_confs .append ({
103
+ 'container_path' : input_item .path ,
104
+ 'pulsar_path' : pulsar_path ,
105
+ 'url' : input_item .url ,
106
+ 'type' : input_item .type
107
+ })
107
108
108
109
return resource_conf , volume_confs , input_confs , output_confs
109
110
@@ -159,12 +160,12 @@ async def handle_run_task(event: Event) -> None:
159
160
)
160
161
task = get_else_throw (task_monad_init , TaskNotFoundError (task_id , Just (TesTaskState .INITIALIZING )))
161
162
162
- # Early check: If task was cancelled very quickly after being set to RUNNING
163
+ # Early check for cancellation
163
164
current_task_after_init_monad = await task_repository .get_task (maybe_of (author ), {'_id' : task_id })
164
165
current_task_after_init = get_else_throw (current_task_after_init_monad , TaskNotFoundError (task_id ))
165
166
if current_task_after_init .state == TesTaskState .CANCELED :
166
167
print (f"Task { task_id } found CANCELED shortly after RUNNING state update. Aborting handler." )
167
- return # API cancel path handles Pulsar cleanup
168
+ return
168
169
169
170
await update_last_task_log_time (
170
171
task_id ,
@@ -173,49 +174,44 @@ async def handle_run_task(event: Event) -> None:
173
174
start_time = Just (datetime .datetime .now (datetime .timezone .utc ))
174
175
)
175
176
176
- # Prepare Pulsar commands
177
- container_cmds = list ()
178
- stage_in_mount = payload ['task_config' ]['inputs_directory' ]
179
177
stage_exec = TesTaskExecutor (image = "willdockerhub/curl-wget:latest" , command = [], workdir = Path ("/downloads" ))
180
178
181
- stage_in_command_str_val = None
182
- if CONTAINER_TYPE == "docker" :
183
- stage_in_command_str_val = docker_stage_in_command (stage_exec , resource_conf , stage_in_mount , input_confs )
184
- elif CONTAINER_TYPE == "singularity" :
185
- stage_exec .image = "docker://" + stage_exec .image # Singularity needs "docker://" prefix
186
- stage_in_command_str_val = singularity_stage_in_command (stage_exec , resource_conf , stage_in_mount , input_confs )
187
-
179
+ # Stage-in command
180
+ stage_in_cmd = ""
181
+ stage_in_mount = ""
182
+ if input_confs :
183
+ stage_in_mount = payload ['task_config' ]['inputs_directory' ]
184
+ stage_in_cmd = stage_in_command (stage_exec , resource_conf , stage_in_mount , input_confs , CONTAINER_TYPE )
185
+
186
+ # Main execution commands
187
+ container_cmds = []
188
188
for i , executor in enumerate (task .executors ):
189
- run_script_cmd_str , script_content = "" , ""
190
- if CONTAINER_TYPE == "docker" :
191
- run_script_cmd_str , script_content = docker_run_command (executor , task_id , resource_conf , volume_confs ,
192
- input_confs , output_confs , stage_in_mount , i )
193
- elif CONTAINER_TYPE == "singularity" :
194
- mount_job_dir = payload ['task_config' ]['job_directory' ]
195
- run_script_cmd_str , script_content = singularity_run_command (executor , task_id , resource_conf , volume_confs ,
196
- input_confs , output_confs , stage_in_mount , mount_job_dir , i )
197
-
198
- await pulsar_operations .upload (
199
- task_id , DataType .INPUT , # Use task_id from payload, not payload['task_id']
200
- file_content = Just (script_content ),
201
- file_path = f'run_script_{ i } .sh' )
202
- container_cmds .append (run_script_cmd_str )
203
-
204
- stage_out_command_str_val = None
205
- if CONTAINER_TYPE == "docker" :
206
- stage_out_command_str_val = docker_stage_out_command (stage_exec , resource_conf , output_confs , volume_confs )
207
- elif CONTAINER_TYPE == "singularity" :
208
- mount_job_dir = payload ['task_config' ]['job_directory' ]
209
- bind_mount = payload ['task_config' ]['inputs_directory' ] # This might be stage_in_mount too
210
- stage_out_command_str_val = singularity_stage_out_command (stage_exec , resource_conf , bind_mount ,
211
- output_confs , volume_confs , mount_job_dir )
212
-
189
+ run_cmd = run_command (
190
+ executor = executor , job_id = str (task_id ), resource_conf = resource_conf ,
191
+ volume_confs = volume_confs , input_confs = input_confs , output_confs = output_confs ,
192
+ inputs_directory = stage_in_mount , container_type = CONTAINER_TYPE ,
193
+ job_directory = payload ['task_config' ].get ('job_directory' ) if CONTAINER_TYPE == "singularity" else None ,
194
+ executor_index = i
195
+ )
196
+ container_cmds .append (run_cmd )
197
+
198
+ # Stage-out command
199
+ stage_out_cmd = ""
200
+ if output_confs :
201
+ stage_out_cmd = stage_out_command (
202
+ stage_exec , resource_conf , output_confs , volume_confs ,
203
+ container_type = CONTAINER_TYPE ,
204
+ bind_mount = payload ['task_config' ].get ('inputs_directory' ) if CONTAINER_TYPE == "singularity" else None ,
205
+ job_directory = payload ['task_config' ].get ('job_directory' ) if CONTAINER_TYPE == "singularity" else None
206
+ )
207
+
208
+ # Combine all commands into a single string for Pulsar
213
209
executors_commands_joined_str = " && " .join (filter (None , container_cmds ))
214
-
215
- # Construct the final command string for Pulsar
216
- command_list_for_join = [cmd for cmd in [stage_in_command_str_val , executors_commands_joined_str , stage_out_command_str_val ] if cmd and cmd .strip ()]
217
- run_command_str = f"set -xe && { ' && ' .join (command_list_for_join )} " if command_list_for_join else None
210
+ parts = ["set -xe" , stage_in_cmd , executors_commands_joined_str , stage_out_cmd ]
211
+ non_empty_parts = [p .strip () for p in parts if p and p .strip ()]
212
+ run_command_str = " && " .join (non_empty_parts ) if non_empty_parts else None
218
213
214
+ # Resume with the polished version's logic for execution and state management
219
215
command_start_time = datetime .datetime .now (datetime .timezone .utc )
220
216
command_status : dict
221
217
@@ -225,7 +221,7 @@ async def handle_run_task(event: Event) -> None:
225
221
else :
226
222
print (f"Submitting job to Pulsar for task { task_id } : { run_command_str } " )
227
223
await pulsar_operations .run_job (task_id , run_command_str )
228
- command_status = await pulsar_operations .job_status_complete (str (task_id )) # Polls Pulsar for job completion
224
+ command_status = await pulsar_operations .job_status_complete (str (task_id ))
229
225
230
226
command_end_time = datetime .datetime .now (datetime .timezone .utc )
231
227
await append_task_executor_logs (
@@ -235,7 +231,6 @@ async def handle_run_task(event: Event) -> None:
235
231
command_status .get ('returncode' , - 1 )
236
232
)
237
233
238
- # Re-fetch task state to check for external cancellation during job execution
239
234
current_task_monad = await task_repository .get_task (maybe_of (author ), {'_id' : task_id })
240
235
current_task_obj = get_else_throw (current_task_monad , TaskNotFoundError (task_id ))
241
236
@@ -249,7 +244,6 @@ async def handle_run_task(event: Event) -> None:
249
244
await pulsar_operations .erase_job (task_id )
250
245
return
251
246
252
- # Job successful and not cancelled, set to COMPLETE
253
247
print (f"Task { task_id } completed successfully. Setting state to COMPLETE." )
254
248
await Promise (lambda resolve , reject : resolve (None )) \
255
249
.then (lambda ignored : task_repository .update_task_state (
@@ -263,7 +257,6 @@ async def handle_run_task(event: Event) -> None:
263
257
.then (lambda x : x )
264
258
265
259
except asyncio .CancelledError :
266
- # This asyncio.Task (handle_run_task) was cancelled externally
267
260
print (f"handle_run_task for task { task_id } was explicitly cancelled (asyncio.CancelledError)." )
268
261
await task_repository .update_task_state (task_id , None , TesTaskState .CANCELED )
269
262
await pulsar_operations .kill_job (task_id )
@@ -278,7 +271,6 @@ async def handle_run_task(event: Event) -> None:
278
271
print (f"Task { task_id } is already CANCELED. Exception '{ type (error ).__name__ } ' likely due to this. No further error processing by handler." )
279
272
return
280
273
281
- # If not already CANCELED, proceed with standard error handling
282
274
print (f"Task { task_id } not CANCELED; proceeding with pulsar_event_handle_error for '{ type (error ).__name__ } '." )
283
275
error_handler_result = pulsar_event_handle_error (error , task_id , event_name , pulsar_operations )
284
276
if asyncio .iscoroutine (error_handler_result ) or isinstance (error_handler_result , _Promise ):
0 commit comments