Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions runner/src/unstract/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,14 @@ def _get_container_command(

# Shell script components
mkdir_cmd = f"mkdir -p {shared_log_dir}"
run_tool_fn = (
"run_tool() { "
f"{tool_cmd}; "
"exit_code=$?; "
f'echo "{LogFieldName.TOOL_TERMINATION_MARKER} with exit code $exit_code" '
f">> {shared_log_file}; "
"return $exit_code; "
"}"
run_tool_cmd = (
f"{tool_cmd} > {shared_log_file} 2>&1; "
f'echo "{LogFieldName.TOOL_TERMINATION_MARKER} with exit code $?" >> {shared_log_file}'
)
execute_cmd = f"run_tool > {shared_log_file} 2>&1"

# Combine all commands
shell_script = f"{mkdir_cmd} && {run_tool_fn}; {execute_cmd}"
# Combine commands with signal trapping to ignore SIGTERM and SIGINT
# This ensures the shell stays alive when receiving termination signals
shell_script = f"trap '' TERM INT; {mkdir_cmd} && {run_tool_cmd}"
return shell_script

def _handle_tool_execution_status(
Expand Down Expand Up @@ -455,7 +450,7 @@ def run_container(
)

container_config = self.client.get_container_run_config(
command=["dumb-init", "/bin/sh", "-c", container_command],
command=["/bin/sh", "-c", container_command],
file_execution_id=file_execution_id,
shared_log_dir=shared_log_dir, # Pass directory for mounting
container_name=container_name,
Expand Down
25 changes: 13 additions & 12 deletions tool-sidecar/src/unstract/tool_sidecar/log_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import os
import signal
import time
import types
from datetime import UTC, datetime
from typing import Any

Expand All @@ -28,14 +27,6 @@
logger = logging.getLogger(__name__)


def _signal_handler(signum: int, _frame: types.FrameType | None):
"""Handle shutdown signals gracefully."""
sig = signal.Signals(signum)
signal_name = sig.name
logger.warning(f"RECEIVED SIGNAL: {signal_name}")
logger.warning("Initiating graceful shutdown...")


class LogProcessor:
def __init__(
self,
Expand Down Expand Up @@ -256,9 +247,19 @@ def main():
"""Main entry point for the sidecar container.
Sets up the log processor with environment variables and starts monitoring.
"""
# Set up signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
# Ignore SIGTERM and SIGINT from the very beginning to prevent
# any interruption of monitoring operations
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)

# Prevent signals from interrupting system calls
# This ensures I/O operations are not interrupted by ignored signals
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGINT, False)

logger.info(
"Sidecar configured to ignore SIGTERM/SIGINT for uninterrupted monitoring"
)

# Get configuration from environment
log_path = os.getenv(Env.LOG_PATH, "/shared/logs/logs.txt")
Expand Down
25 changes: 13 additions & 12 deletions unstract/sdk1/src/unstract/sdk1/tool/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import signal
import types

from unstract.sdk1.tool.base import BaseTool
from unstract.sdk1.tool.executor import ToolExecutor
Expand All @@ -12,14 +11,6 @@
class ToolEntrypoint:
"""Class that contains methods for the entrypoint for a tool."""

@staticmethod
def _signal_handler(signum: int, _frame: types.FrameType | None) -> None:
"""Handle SIGTERM and SIGINT signals."""
sig = signal.Signals(signum)
signal_name = sig.name
logger.warning(f"RECEIVED SIGNAL: {signal_name}")
logger.warning("Initiating graceful shutdown...")

@staticmethod
def launch(tool: BaseTool, args: list[str]) -> None:
"""Entrypoint function for a tool.
Expand All @@ -31,9 +22,19 @@ def launch(tool: BaseTool, args: list[str]) -> None:
tool (AbstractTool): Tool to execute
args (List[str]): Arguments passed to a tool
"""
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, ToolEntrypoint._signal_handler)
signal.signal(signal.SIGINT, ToolEntrypoint._signal_handler)
# Ignore SIGTERM and SIGINT from the very beginning to prevent
# any interruption of tool operations
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)

# Prevent signals from interrupting system calls
# This ensures I/O operations are not interrupted by ignored signals
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGINT, False)

logger.info(
"Tool configured to ignore SIGTERM/SIGINT for uninterrupted execution"
)

parsed_args = ToolArgsParser.parse_args(args)
executor = ToolExecutor(tool=tool)
Expand Down
17 changes: 16 additions & 1 deletion unstract/sdk1/src/unstract/sdk1/tool/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,23 @@ def execute_run(self, args: argparse.Namespace) -> None:
output_dir=self.tool.get_output_dir(),
)
except Exception as e:
msg = f"Error while running tool '{tool_name}': {str(e)}"
msg = f"Error while running tool '{tool_name}': {type(e).__name__}: {str(e)}"
logger.error(msg, stack_info=True, exc_info=True)

# Log more details about the exception
import traceback

logger.error(f"Full traceback:\n{traceback.format_exc()}")

# Check if it's interruption-related
if isinstance(e, InterruptedError | KeyboardInterrupt):
logger.warning(
"Operation was interrupted by signal - "
"this is expected during pod deletion"
)
elif "interrupted system call" in str(e).lower():
logger.warning("System call was interrupted - likely due to SIGTERM")

self.tool.stream_error_and_exit(msg)

# TODO: Call tool method to validate if output was written