Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow still in "Running" state when agent is down #7239

Open
4 tasks done
matlh0 opened this issue Oct 19, 2022 · 21 comments
Open
4 tasks done

Flow still in "Running" state when agent is down #7239

matlh0 opened this issue Oct 19, 2022 · 21 comments
Labels
enhancement An improvement of an existing feature

Comments

@matlh0
Copy link

matlh0 commented Oct 19, 2022

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When we start simple flow with time.sleep and stop the agent's process, the flow still in running state

example below, I brake the "prefect agent start" process with a CTRL+C

13:20:50.284 | INFO | prefect.agent - Submitting flow run '09dd7834-4ae3-4462-9f1d-ecd53ad37c11'
13:20:50.311 | INFO | prefect.infrastructure.process - Opening process 'crouching-seriema'...
13:20:50.314 | INFO | prefect.agent - Completed submission of flow run '09dd7834-4ae3-4462-9f1d-ecd53ad37c11'
13:20:51.382 | INFO | Flow run 'crouching-seriema' - Created task run 'task_start-1870bbf8-0' for task 'task_start'
13:20:51.382 | INFO | Flow run 'crouching-seriema' - Executing 'task_start-1870bbf8-0' immediately...
13:20:51.399 | INFO | Task run 'task_start-1870bbf8-0' - Starting flow
13:20:51.409 | INFO | Task run 'task_start-1870bbf8-0' - Finished in state Completed()
^C
Aborted.

Reproduction

import prefect
from prefect import flow, task, get_run_logger
import time


@task
def task_start():
    logger = get_run_logger()
    logger.info("Starting flow")

def task_end():
    logger = get_run_logger()
    logger.info("End of flow")
@flow()
def sleeping_flow():
    task_start()
    time.sleep(120)
    task_end()

Error

No response

Versions

Version:             2.5.0
API version:         0.8.2
Python version:      3.8.10
Git commit:          eac37918
Built:               Thu, Oct 6, 2022 12:41 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

OS : Ubuntu 20.04

Agent and server are runs on the same server

@matlh0 matlh0 added bug Something isn't working status:triage labels Oct 19, 2022
@bunchesofdonald bunchesofdonald added enhancement An improvement of an existing feature status:accepted and removed bug Something isn't working status:triage labels Oct 19, 2022
@bunchesofdonald
Copy link
Contributor

Thanks for the report @matlh0! I agree we could do better here, we should probably abort any running flows / tasks so that they have a more accurate end state reflected. We should also consider a 'warm shutdown' where we wait until the flow/tasks end before exiting.

@zanieb
Copy link
Contributor

zanieb commented Oct 19, 2022

This behavior is intentional. All agents can be stopped/started without affecting their flow runs. I'm not sure I agree that this agent should kill its runs on shutdown.

@matlh0
Copy link
Author

matlh0 commented Oct 19, 2022

This behavior is intentional. All agents can be stopped/started without affecting their flow runs. I'm not sure I agree that this agent should kill its runs on shutdown.

Ok, but if I restart the agent, the flow does not continue and still in Running state

Thanks for the report @matlh0! I agree we could do better here, we should probably abort any running flows / tasks so that they have a more accurate end state reflected. We should also consider a 'warm shutdown' where we wait until the flow/tasks end before exiting.

the 'warm shutdown' could be good in case of version upgrade and restart of the agent.
We don't use docker or kubernetes, agents run in local mode

@zanieb
Copy link
Contributor

zanieb commented Oct 19, 2022

Ok, but if I restart the agent, the flow does not continue and still in Running state

It's possible when the agent is killed the flow crashes but does not report its state. If that is the case, we can investigate a fix for it. However, the agent is not required for the flow to run. Once the agent creates the flow run process, the presence of an agent should not affect the flow. It does not require the agent to be started again to resume running.

@matlh0
Copy link
Author

matlh0 commented Oct 19, 2022

Ok, but if I restart the agent, the flow does not continue and still in Running state

It's possible when the agent is killed the flow crashes but does not report its state. If that is the case, we can investigate a fix for it. However, the agent is not required for the flow to run. Once the agent creates the flow run process, the presence of an agent should not affect the flow. It does not require the agent to be started again to resume running.

After a little investigation, before running flow, i show this process :

root 1271628 1147962 8 15:05 pts/6 00:00:00 /usr/bin/python3.8 /usr/bin/prefect agent start -q sleep

When i start the flow, another process appears

root 1271628 1147962 0 15:05 pts/6 00:00:01 /usr/bin/python3.8 /usr/bin/prefect agent start -q sleep
root 1271701 1271628 99 15:08 pts/6 00:00:00 /usr/bin/python3.8 -m prefect.engine
Master process: 1271628 (prefect agent)
Child process: 1271701

if I kill 1271628, child process is killed too

After that, if I start an agent, the flow never resume

@zanieb
Copy link
Contributor

zanieb commented Oct 19, 2022

Great thanks! It's weird that the SIGINT sent to the parent process isn't forwarded to the child process in a way that lets our interrupt detection report the flow run as crashed. We can investigate that.

Separately, we should probably be launching processes for flow runs from agents as daemons so if the parent exits the child does not get killed. This would probably be a good setting to include on the Process infrastructure directly.

@mahiki
Copy link

mahiki commented Nov 14, 2022

I've seen this same issue, just commenting to indicate my support for the issue.

@thuyt001
Copy link

I just updated prefect to the latest version 2.6.9. After restarting Orion and the agents after update, I'm still seeing the old flow runs with Running state, which have been stuck like that for the past 5 hours. I don't know how to confirm that they're really gone and that the UI is just out of synch.

@davidssand
Copy link
Contributor

I’ve got the same problem, but my deployment is a bit different: currently I’m running many agents, each one running a docker daemon. This decision was made so that the resources used by the flow runs can be easily monitored (I just need to look at how much resources my agent uses). My deployments use the Docker infrastructure, so the flow runs are run by the agents’ docker daemons inside docker containers.

So in this case, if an agent goes down, no one is working on the flow runs anymore. The flow runs in the RUNNING or PENDING state remain like that forever.

I know my case is a really specific one, but looks like this problem happens in other cases also, as can be seen above.

My first approach to try to find a solution for this was to set a timeout for the flows. But I realized later that the agent is the one who checks if the timeout was reached or not, so if the agent goes down the flow runs would never timeout.

Here is a suggestion for a generic solution: the flow run should send an “is alive” signal periodically to the server, allowing the server to check which flow runs are RUNNING and mark the ones that are not anymore as CRASHED. What do you think?

@zanieb
Copy link
Contributor

zanieb commented Mar 9, 2023

the flow run should send an “is alive” signal periodically to the server, allowing the server to check which flow runs are RUNNING and mark the ones that are not anymore as CRASHED. What do you think?

This is a "heartbeat" implementation and we did this in Prefect 1 as the sole mechanism for determining if a run was crashed. However, there were frequent false positives during CPU or memory intensive workloads (and other things) resulting in flows being marked as CRASHED even when they were still happily RUNNING. We have avoided this pattern thusfar for that reason.

@davidssand
Copy link
Contributor

What if we implemented the heartbeat and created another state for this case, so that we could know which flow runs are not reporting to the server?
It could be called UNTRACKED and could be reached only by flow runs in the RUNNING state when the heartbeat is not received by the server in time. If a flow run in the UNTRACKED state manages to send the heartbeat, it goes back to the RUNNING state.

Another improvement after that would be to allow user to configure the server to crash flow runs in the UNTRACKED state.

@alexprice12
Copy link

What's the suggested here? I understand this is a hard challenge, but I'd hope for at least a lever I can pull to get my job to finish? I can't even "cancel" the build it appears (at least in the GUI).

Agents will sometimes die unreliably, I'd like my task to eventually get finished :)

@kevin868
Copy link
Contributor

This can be a bad issue if you have concurrency limits on your work queues / pools. ex - jobs stuck in RUNNING (fall out of the observability window of the last week). Now you have some hidden jobs that are not truly running, and eating up your concurrency. Eventually no new jobs can be read out of the queue because the concurrency is taken by all RUNNING jobs which are actually gone / Crashed.

Our work around has been to script the API:

  1. Find all flow runs, with a stateFilter for "Running".
  2. Set those states to "Crashed" if the flow run is older than some threshold (1 day).
  3. If desired, you can also restart those flow runs (we usually just kill, then trigger a new deployment that the flow run comes from).

Caution: rough!!

FROM_STATE = "Running"
TO_STATE = schemas.states.Crashed()

state_filter = {}
state_filter["name"] = {"any_": [FROM_STATE]}
# expected_start_threshold = pendulum.datetime(2022, 11, 23, 0, 0)
expected_start_threshold = None
flow_runs = asyncio.run(
    get_client().read_flow_runs(
        # flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None,
        flow_run_filter=FlowRunFilter(
            state=state_filter,
            expected_start_time=schemas.filters.FlowRunFilterExpectedStartTime(
                after_=expected_start_threshold
            )
            if expected_start_threshold
            else None,
        ),
        # limit=limit,
        sort=FlowRunSort.EXPECTED_START_TIME_DESC,
    )
)

for flow_run in flow_runs:
    if MODIFY_STATE:
        details = asyncio.run(
            get_client().set_flow_run_state(
                flow_run.id,
                # state=schemas.states.Scheduled(flow_run.expected_start_time.add(hours=2)),
                state=TO_STATE,
                # force=True,
            )
        )

Really not an ideal experience. I believe it's a tough ask of the current model because afaik, the Prefect Server isn't aware of who the agents are (which worker has pulled out a flow run from the queue). Thus, it's not easy to impose a heartbeat, or alive-check on each of these flow runs. It seems reasonable to me to impose a 2-day timeout though.

@argibbs
Copy link

argibbs commented Jun 20, 2023

This is a "heartbeat" implementation and we did this in Prefect 1 as the sole mechanism for determining if a run was crashed.

@zanieb I've just started using Prefect (2.10+), and while it's really great at scheduling etc, the lack of a server-driven "this task is not responding, we're going to mark it as dead" is a real concern, made worse because I use concurrency limits. Consider:

  1. I spin up a task. It consumes the concurrency slot.
  2. It crashes. The server considers the old flow/task as still running.
  3. I relaunch the task ... and the new version then blocks indefinitely. The concurrency slot is still taken by its previous incarnation..

I'd consider this to be pretty standard use case for a production system, but there are at least three separate issues open in prefect right now around this problem.

  1. This one (effectively, a lack of heart beating)
  2. Problems freeing concurrency slots: Task concurrency slots not released on hard crashes #9895 (which itself references two earlier issues Deleting a flow or task run should clear any consumed concurrency limits #5995 and Tasks running while a flow is deleted keep running, and do not release concurrency slots #7753)
  3. A lack of clarity about being blocked by concurrency limits: Configurable delay for tasks when delayed by task concurrency limits #9500 (which hopefully will include the blocked state requested by State or message to signal a task's execution was delayed by a task concurrency limit #9243)

There seem to me to be a couple of easy wins here:

  1. If the task/flow has a timeout set, the server should be aware of it, and kill the flow after that point. This would negate the need for @kevin868's script - he just sets timeout_seconds=timedelta(days=2).total_seconds() and the server will kill it for him.
  2. Add back heartbeat support. You clearly have been burned by false positives in v1, so maybe make it optional, off by default. Or make it mandatory, but optional for the server to kill the flow when it's unresponsive.

Prefect is hella impressive, and since finding it I've been singing its praises to everyone, but those praises are currently tempered by a "but you have to be constantly monitoring it for stuck jobs - I'd be careful before putting it in production" caveat.

@chrisgoddard
Copy link

Just to have to add a "me-too" on this thread - this has been one of the challenges of migrating to v2 from v1 -- I feel like v2 is currently less reliable in production than was my experience with v1.

@zanieb
Copy link
Contributor

zanieb commented Jul 12, 2023

Hey @argibbs I'm not working on this project anymore but there's definitely some people thinking about the right way to make this better! cc @WillRaphaelson / @billpalombi

@shawnlong636
Copy link

shawnlong636 commented Dec 29, 2023

Hey @WillRaphaelson @billpalombi , I just want to add a +1 here and report that I'm having the same issue using prefect agents in kubernetes. From a user perspective, it's frustrating that I have to keep going in, check for running flows more than an hour and mark crashed.

Even as a temporary work-around, it would be great to be able to set a max-runtime parameter so that if a job is in a running state for the designated period, it'll automatically show an error state.

@ytl-liva
Copy link

+1

@davidssand
Copy link
Contributor

Can we lift the priority of this issue?

@davidssand
Copy link
Contributor

davidssand commented Jun 17, 2024

Here is another workaround to help with the problem, to complement @kevin868's idea.

Have 1 agent working for each work queue. When you start your agent, make it crash all running flow runs of its work queue, alerting you something went wrong. To find which flow runs belong to the given work queue you can use tags.

Of course, this implementation does not work when you need many agents working in 1 work queue, since the restart of an agent would crash "active" running flow runs.

@adrianschneider94
Copy link

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

No branches or pull requests