Description
Description
Given a flow that is intended to run (say) every minute, the flow itself throws an error:
[2020-03-09 12:48:50,360] ERROR - prefect.Flow: Hello | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error")
(See full trace section below for more)
The process running the scheduled flow dies, cancelling all future runs of the workflow.
Expected Behavior
The flow waits for the scheduler to return, and starts at the earliest opportunity before resuming the schedule. I'd expect a non-scheduled workflow to error out since it is only supposed to run one time.
Reproduction
from prefect import task, Flow
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
from prefect.engine.executors import DaskExecutor
from os import environ
from hello import say_hello
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=1),
interval=timedelta(minutes=1))
with Flow("Hello", schedule) as flow:
say_hello()
if __name__ == '__main__':
flow.run(executor=DaskExecutor(address=environ.get('SCHEDULER_ADDRESS')))
Environment
Prefect 0.9.7 on Anaconda 4.8.2 on Windows 10
Full Trace
[2020-03-09 12:48:50,310] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error")
Traceback (most recent call last):
File "\Anaconda\lib\site-packages\distributed\comm\core.py", line 218, in connect
quiet_exceptions=EnvironmentError,
File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
tornado.util.TimeoutError: Timeout
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "\Anaconda\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "\Anaconda\lib\site-packages\prefect\engine\flow_runner.py", line 398, in get_flow_run_state
with executor.start():
File "\Anaconda\lib\contextlib.py", line 112, in __enter__
return next(self.gen)
File "\Anaconda\lib\site-packages\prefect\engine\executors\dask.py", line 78, in start
with Client(self.address, **self.kwargs) as client:
File "\Anaconda\lib\site-packages\distributed\client.py", line 712, in __init__
self.start(timeout=timeout)
File "\Anaconda\lib\site-packages\distributed\client.py", line 858, in start
sync(self.loop, self._start, **kwargs)
File "\Anaconda\lib\site-packages\distributed\utils.py", line 331, in sync
six.reraise(*error[0])
File "\Anaconda\lib\site-packages\six.py", line 703, in reraise
raise value
File "\Anaconda\lib\site-packages\distributed\utils.py", line 316, in f
result[0] = yield future
File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File "\Anaconda\lib\site-packages\tornado\gen.py", line 742, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "\Anaconda\lib\site-packages\distributed\client.py", line 954, in _start
yield self._ensure_connected(timeout=timeout)
File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File "\Anaconda\lib\site-packages\tornado\gen.py", line 742, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "\Anaconda\lib\site-packages\distributed\client.py", line 1010, in _ensure_connected
connection_args=self.connection_args,
File "\Anaconda\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File "\Anaconda\lib\site-packages\tornado\gen.py", line 742, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "\Anaconda\lib\site-packages\distributed\comm\core.py", line 230, in connect
_raise(error)
File "\Anaconda\lib\site-packages\distributed\comm\core.py", line 207, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error
[2020-03-09 12:48:50,360] ERROR - prefect.Flow: Hello | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'tcp://localhost:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x0000017EBAD57208>: ConnectionRefusedError: [Errno 10061] Unknown error")
Activity