Skip to content

Scheduled flow terminates when scheduler is unavailable for 10 seconds #2133

Closed
@AndyIII

Description

Description

Given a flow that is intended to run (say) every minute, the flow itself throws an error:

[2020-03-09 12:48:50,360] ERROR - prefect.Flow: Hello | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error")
(See full trace section below for more)

The process running the scheduled flow dies, cancelling all future runs of the workflow.

Expected Behavior

The flow waits for the scheduler to return, and starts at the earliest opportunity before resuming the schedule. I'd expect a non-scheduled workflow to error out since it is only supposed to run one time.

Reproduction

from prefect import task, Flow
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
from prefect.engine.executors import DaskExecutor
from os import environ

from hello import say_hello

schedule = IntervalSchedule(
    start_date=datetime.utcnow() + timedelta(seconds=1),
    interval=timedelta(minutes=1))

with Flow("Hello", schedule) as flow:
    say_hello()

if __name__ == '__main__':
    flow.run(executor=DaskExecutor(address=environ.get('SCHEDULER_ADDRESS')))

Environment

Prefect 0.9.7 on Anaconda 4.8.2 on Windows 10

Full Trace

[2020-03-09 12:48:50,310] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error")
Traceback (most recent call last):
  File "\Anaconda\lib\site-packages\distributed\comm\core.py", line 218, in connect
    quiet_exceptions=EnvironmentError,
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "\Anaconda\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "\Anaconda\lib\site-packages\prefect\engine\flow_runner.py", line 398, in get_flow_run_state
    with executor.start():
  File "\Anaconda\lib\contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "\Anaconda\lib\site-packages\prefect\engine\executors\dask.py", line 78, in start
    with Client(self.address, **self.kwargs) as client:
  File "\Anaconda\lib\site-packages\distributed\client.py", line 712, in __init__
    self.start(timeout=timeout)
  File "\Anaconda\lib\site-packages\distributed\client.py", line 858, in start
    sync(self.loop, self._start, **kwargs)
  File "\Anaconda\lib\site-packages\distributed\utils.py", line 331, in sync
    six.reraise(*error[0])
  File "\Anaconda\lib\site-packages\six.py", line 703, in reraise
    raise value
  File "\Anaconda\lib\site-packages\distributed\utils.py", line 316, in f
    result[0] = yield future
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "\Anaconda\lib\site-packages\distributed\client.py", line 954, in _start
    yield self._ensure_connected(timeout=timeout)
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "\Anaconda\lib\site-packages\distributed\client.py", line 1010, in _ensure_connected
    connection_args=self.connection_args,
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File "\Anaconda\lib\site-packages\tornado\gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "\Anaconda\lib\site-packages\distributed\comm\core.py", line 230, in connect
    _raise(error)
  File "\Anaconda\lib\site-packages\distributed\comm\core.py", line 207, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error
[2020-03-09 12:48:50,360] ERROR - prefect.Flow: Hello | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error")

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions