Skip to content

Commit 2e023b8

Browse files
authored
Merge pull request #1518 from kostrykin/dev/no_early_termination
Add switch to prevent early termination of workflow runs
2 parents 1974a81 + 7d80ff2 commit 2e023b8

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

planemo/commands/cmd_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
@options.run_download_outputs_option()
3131
@options.engine_options()
3232
@options.test_options()
33+
@options.no_early_termination_option()
3334
@command_function
3435
def cli(ctx, runnable_identifier, job_path, **kwds):
3536
"""Planemo command for running tools and jobs.

planemo/galaxy/activity.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def _execute( # noqa C901
215215
no_wait=kwds.get("no_wait", False),
216216
start_datetime=start_datetime,
217217
log=log_contents_str(config),
218+
early_termination=not kwds.get("no_early_termination", False),
218219
)
219220

220221
else:
@@ -240,7 +241,15 @@ def _execute( # noqa C901
240241

241242

242243
def invocation_to_run_response(
243-
ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None
244+
ctx,
245+
user_gi,
246+
runnable,
247+
invocation,
248+
polling_backoff=0,
249+
no_wait=False,
250+
start_datetime=None,
251+
log=None,
252+
early_termination=True,
244253
):
245254
start_datetime = start_datetime or datetime.now()
246255
invocation_id = invocation["id"]
@@ -256,6 +265,7 @@ def invocation_to_run_response(
256265
user_gi=user_gi,
257266
no_wait=no_wait,
258267
polling_backoff=polling_backoff,
268+
early_termination=early_termination,
259269
)
260270
if final_invocation_state not in ("ok", "skipped", "scheduled"):
261271
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
@@ -764,7 +774,13 @@ def _history_id(gi, **kwds) -> str:
764774

765775

766776
def wait_for_invocation_and_jobs(
767-
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int
777+
ctx,
778+
invocation_id: str,
779+
history_id: str,
780+
user_gi: GalaxyInstance,
781+
no_wait: bool,
782+
polling_backoff: int,
783+
early_termination: bool,
768784
):
769785
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
770786
final_invocation_state = "new"
@@ -783,7 +799,7 @@ def wait_for_invocation_and_jobs(
783799
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")
784800

785801
if not no_wait:
786-
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
802+
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination)
787803
if job_state not in ("ok", "skipped"):
788804
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
789805
error_message = msg if not error_message else f"{error_message}. {msg}"
@@ -799,6 +815,7 @@ def wait_for_invocation_and_jobs(
799815
user_gi=user_gi,
800816
no_wait=no_wait,
801817
polling_backoff=polling_backoff,
818+
early_termination=early_termination,
802819
)
803820
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
804821
return final_invocation_state, job_state, error_message
@@ -852,7 +869,7 @@ def state_func():
852869
return _wait_on_state(state_func, polling_backoff)
853870

854871

855-
def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
872+
def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True):
856873
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
857874
# as you could have more than one invocation in a history, or an invocation without
858875
# steps that produce history items.
@@ -862,7 +879,7 @@ def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
862879
def state_func():
863880
return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))
864881

865-
return _wait_on_state(state_func, polling_backoff)
882+
return _wait_on_state(state_func, polling_backoff, early_termination=early_termination)
866883

867884

868885
def _wait_for_job(gi, job_id, timeout=None):
@@ -872,7 +889,7 @@ def state_func():
872889
return _wait_on_state(state_func, timeout=timeout)
873890

874891

875-
def _wait_on_state(state_func, polling_backoff=0, timeout=None):
892+
def _wait_on_state(state_func, polling_backoff=0, timeout=None, early_termination=True):
876893
def get_state():
877894
response = state_func()
878895
if not isinstance(response, list):
@@ -894,6 +911,8 @@ def get_state():
894911
"cancelled",
895912
"failed",
896913
]
914+
if not early_termination and current_non_terminal_states:
915+
return None
897916
for terminal_state in hierarchical_fail_states:
898917
if terminal_state in current_states:
899918
# If we got here something has failed and we can return (early)

planemo/options.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2067,6 +2067,16 @@ def tool_init_example_command_option(help=EXAMPLE_COMMAND_HELP):
20672067
)
20682068

20692069

2070+
def no_early_termination_option():
2071+
return planemo_option(
2072+
"--no_early_termination",
2073+
is_flag=True,
2074+
default=False,
2075+
prompt=False,
2076+
help="Wait until all jobs terminate, even if some jobs have failed",
2077+
)
2078+
2079+
20702080
def mulled_conda_option():
20712081
return planemo_option(
20722082
"--mulled_conda_version",

0 commit comments

Comments
 (0)