Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful agent shutdowns #1731

Merged
merged 4 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Add informative logs in the event that a heartbeat thread dies - [#1721](https://github.com/PrefectHQ/prefect/pull/1721)
- Loosen Job spec requirements for `KubernetesJobEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713)
- Loosen `containerDefinitions` requirements for `FargateTaskEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713)
- Add graceful keyboard interrupt shutdown for all agents - [#1731](https://github.com/PrefectHQ/prefect/pull/1731)

### Task Library

Expand Down
14 changes: 12 additions & 2 deletions src/prefect/agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import ast
import logging
import signal
import time
from typing import Iterable, Union
from typing import Any, Iterable, Union

import pendulum

Expand Down Expand Up @@ -62,6 +63,14 @@ def __init__(self, name: str = None, labels: Iterable[str] = None) -> None:
logger.addHandler(ch)

self.logger = logger
self.add_signal_handlers()

def add_signal_handlers(self) -> None:
def _exit(*args: Any, **kwargs: Any) -> None:
self.is_running = False
self.logger.info("Keyboard Interrupt received: Agent is shutting down.")

signal.signal(signal.SIGINT, _exit)

def _verify_token(self, token: str) -> None:
"""
Expand Down Expand Up @@ -89,13 +98,14 @@ def start(self) -> None:
The main entrypoint to the agent. This function loops and constantly polls for
new flow runs to deploy
"""
self.is_running = True
tenant_id = self.agent_connect()

# Loop intervals for query sleep backoff
loop_intervals = {0: 0.25, 1: 0.5, 2: 1.0, 3: 2.0, 4: 4.0, 5: 8.0, 6: 10.0}

index = 0
while True:
while self.is_running:
self.heartbeat()

runs = self.agent_process(tenant_id)
Expand Down