Skip to content

Ability to stop pipelines early #3716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/zenml/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
ScheduleFilter,
)
from zenml.pipelines.pipeline_definition import Pipeline
from zenml.utils import source_utils, uuid_utils
from zenml.utils import run_utils, source_utils, uuid_utils
from zenml.utils.yaml_utils import write_yaml

logger = get_logger(__name__)
Expand Down Expand Up @@ -511,6 +511,59 @@ def list_pipeline_runs(**kwargs: Any) -> None:
cli_utils.print_page_info(pipeline_runs)


@runs.command("stop")
@click.argument("run_name_or_id", type=str, required=True)
@click.option(
"--graceful",
"-g",
is_flag=True,
default=False,
help="Use graceful shutdown (default is False).",
)
@click.option(
"--yes",
"-y",
is_flag=True,
default=False,
help="Don't ask for confirmation.",
)
def stop_pipeline_run(
run_name_or_id: str,
graceful: bool = False,
yes: bool = False,
) -> None:
"""Stop a running pipeline.

Args:
run_name_or_id: The name or ID of the pipeline run to stop.
graceful: If True, uses graceful shutdown. If False, forces immediate termination.
yes: If set, don't ask for confirmation.
"""
# Ask for confirmation to stop run.
if not yes:
action = "gracefully stop" if graceful else "force stop"
confirmation = cli_utils.confirmation(
f"Are you sure you want to {action} pipeline run `{run_name_or_id}`?"
)
if not confirmation:
cli_utils.declare("Pipeline run stop canceled.")
return

# Stop run.
try:
run = Client().get_pipeline_run(name_id_or_prefix=run_name_or_id)
run_utils.stop_run(run=run, graceful=graceful)
action = "Gracefully stopped" if graceful else "Force stopped"
cli_utils.declare(f"{action} pipeline run '{run.name}'.")
except NotImplementedError:
cli_utils.error(
"The orchestrator used for this pipeline run does not support "
f"{'gracefully' if graceful else 'forcefully'} stopping runs."
)
except Exception as e:
cli_utils.error(f"Failed to stop pipeline run: {e}")


@runs.command("delete")
@click.argument("run_name_or_id", type=str, required=True)
@click.option(
Expand Down
2 changes: 2 additions & 0 deletions src/zenml/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,8 @@ def get_execution_status_emoji(status: "ExecutionStatus") -> str:
return ":white_check_mark:"
if status == ExecutionStatus.CACHED:
return ":package:"
if status == ExecutionStatus.STOPPED or status == ExecutionStatus.STOPPING:
return ":stop_sign:"
raise RuntimeError(f"Unknown status: {status}")


Expand Down
1 change: 1 addition & 0 deletions src/zenml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
STATUS = "/status"
STEP_CONFIGURATION = "/step-configuration"
STEPS = "/steps"
STOP = "/stop"
TAGS = "/tags"
TAG_RESOURCES = "/tag_resources"
TRIGGERS = "/triggers"
Expand Down
9 changes: 6 additions & 3 deletions src/zenml/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,28 @@ class ZenMLServiceType(StrEnum):


class ExecutionStatus(StrEnum):
"""Enum that represents the current status of a step or pipeline run."""
"""Enum that represents the execution status of a step or pipeline run."""

INITIALIZING = "initializing"
FAILED = "failed"
COMPLETED = "completed"
RUNNING = "running"
CACHED = "cached"
STOPPED = "stopped"
STOPPING = "stopping"

@property
def is_finished(self) -> bool:
"""Whether the execution status refers to a finished execution.
"""Returns whether the execution status is in a finished state.

Returns:
Whether the execution status refers to a finished execution.
Whether the execution status is finished.
"""
return self in {
ExecutionStatus.FAILED,
ExecutionStatus.COMPLETED,
ExecutionStatus.CACHED,
ExecutionStatus.STOPPED,
}


Expand Down
8 changes: 8 additions & 0 deletions src/zenml/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ class IllegalOperationError(ZenMLBaseException):
"""Raised when an illegal operation is attempted."""


class RunStoppedException(ZenMLBaseException):
"""Raised when a ZenML pipeline run gets stopped by the user."""


class RunInterruptedException(ZenMLBaseException):
"""Raised when a ZenML step gets interrupted for an unknown reason."""


class MethodNotAllowedError(ZenMLBaseException):
"""Raised when the server does not allow a request method."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,16 @@ def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
)["PipelineExecutionStatus"]

# Map the potential outputs to ZenML ExecutionStatus. Potential values:
# https://cloud.google.com/vertex-ai/docs/reference/rest/v1beta1/PipelineState
if status in ["Executing", "Stopping"]:
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html
if status == "Executing":
return ExecutionStatus.RUNNING
elif status in ["Stopped", "Failed"]:
elif status == "Stopping":
return ExecutionStatus.STOPPING
elif status == "Stopped":
return ExecutionStatus.STOPPED
elif status == "Failed":
return ExecutionStatus.FAILED
elif status in ["Succeeded"]:
elif status == "Succeeded":
return ExecutionStatus.COMPLETED
else:
raise ValueError("Unknown status for the pipeline execution.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,16 @@ def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
return ExecutionStatus.INITIALIZING
elif status in ["Running", "Finalizing"]:
return ExecutionStatus.RUNNING
elif status == "CancelRequested":
return ExecutionStatus.STOPPING
elif status == "Canceled":
return ExecutionStatus.STOPPED
elif status in [
"CancelRequested",
"Failed",
"Canceled",
"NotResponding",
]:
return ExecutionStatus.FAILED
elif status in ["Completed"]:
elif status == "Completed":
return ExecutionStatus.COMPLETED
else:
raise ValueError("Unknown status for the pipeline job.")
Expand Down
15 changes: 7 additions & 8 deletions src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:

# Map the potential outputs to ZenML ExecutionStatus. Potential values:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/describe_pipeline_execution.html#
if status in [PipelineState.PIPELINE_STATE_UNSPECIFIED]:
if status == PipelineState.PIPELINE_STATE_UNSPECIFIED:
return run.status
elif status in [
PipelineState.PIPELINE_STATE_QUEUED,
Expand All @@ -979,14 +979,13 @@ def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
PipelineState.PIPELINE_STATE_PAUSED,
]:
return ExecutionStatus.RUNNING
elif status in [PipelineState.PIPELINE_STATE_SUCCEEDED]:
elif status == PipelineState.PIPELINE_STATE_SUCCEEDED:
return ExecutionStatus.COMPLETED

elif status in [
PipelineState.PIPELINE_STATE_FAILED,
PipelineState.PIPELINE_STATE_CANCELLING,
PipelineState.PIPELINE_STATE_CANCELLED,
]:
elif status == PipelineState.PIPELINE_STATE_CANCELLING:
return ExecutionStatus.STOPPING
elif status == PipelineState.PIPELINE_STATE_CANCELLED:
return ExecutionStatus.STOPPED
elif status == PipelineState.PIPELINE_STATE_FAILED:
return ExecutionStatus.FAILED
else:
raise ValueError("Unknown status for the pipeline job.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""
return KubernetesOrchestratorSettings

@property
def supports_cancellation(self) -> bool:
"""Whether this orchestrator supports stopping pipeline runs.

Returns:
True since the Kubernetes orchestrator supports cancellation.
"""
return True

def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
"""Get list of configured Kubernetes contexts and the active context.

Expand Down Expand Up @@ -629,3 +638,106 @@ def get_orchestrator_run_id(self) -> str:
"Unable to read run id from environment variable "
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
)

def _stop_run(
self, run: "PipelineRunResponse", graceful: bool = True
) -> None:
"""Stops a specific pipeline run by terminating step pods.

Args:
run: The run that was executed by this orchestrator.
graceful: If True, does nothing (lets the orchestrator and steps finish naturally).
If False, stops all running step pods.

Raises:
ValueError: If the orchestrator run ID cannot be found.
"""
# If graceful, do nothing and let the orchestrator handle the stop naturally
if graceful:
logger.info(
"Graceful stop requested - the orchestrator pod will handle "
"stopping naturally"
)
return

# Get the orchestrator run ID which corresponds to the orchestrator pod name
orchestrator_run_id = run.orchestrator_run_id
if not orchestrator_run_id:
raise ValueError(
"Cannot determine orchestrator run ID for the run. "
"Unable to stop the pipeline."
)

pods_stopped = []
errors = []

# Configure graceful termination settings
grace_period_seconds = (
30 # Give pods 30 seconds to gracefully shutdown
)

try:
# Find all pods with the orchestrator run ID label
label_selector = f"zenml-orchestrator-run-id={orchestrator_run_id}"
pods = self._k8s_core_api.list_namespaced_pod(
namespace=self.config.kubernetes_namespace,
label_selector=label_selector,
)

# Filter to only include running or pending pods
for pod in pods.items:
if pod.status.phase not in ["Running", "Pending"]:
logger.debug(
f"Skipping pod {pod.metadata.name} with status {pod.status.phase}"
)
continue

try:
self._k8s_core_api.delete_namespaced_pod(
name=pod.metadata.name,
namespace=self.config.kubernetes_namespace,
grace_period_seconds=grace_period_seconds,
)
pods_stopped.append(f"step pod: {pod.metadata.name}")
logger.debug(
f"Successfully initiated graceful stop of step pod: {pod.metadata.name}"
)
except Exception as e:
error_msg = (
f"Failed to stop step pod {pod.metadata.name}: {e}"
)
logger.warning(error_msg)
errors.append(error_msg)

except Exception as e:
error_msg = (
f"Failed to list step pods for run {orchestrator_run_id}: {e}"
)
logger.warning(error_msg)
errors.append(error_msg)

# Summary logging
if pods_stopped:
logger.debug(
f"Successfully initiated graceful termination of: {', '.join(pods_stopped)}. "
f"Pods will terminate within {grace_period_seconds} seconds."
)

if errors:
error_summary = "; ".join(errors)
if not pods_stopped:
# If nothing was stopped successfully, raise an error
raise RuntimeError(
f"Failed to stop pipeline run: {error_summary}"
)
else:
# If some things were stopped but others failed, raise an error
raise RuntimeError(
f"Partial stop operation completed with errors: {error_summary}"
)

# If no step pods were found and no errors occurred
if not pods_stopped and not errors:
logger.info(
f"No running step pods found for pipeline run {orchestrator_run_id}"
)
Loading
Loading