11from .endpoint import Endpoint , QuerysetEndpoint , api
22from .. import FlowRunItem , PaginationItem
3+ from .exceptions import FlowRunFailedException , FlowRunCanceledException
4+ import time
35
46import logging
57
68logger = logging .getLogger ("tableau.endpoint.flowruns" )
79
10+ # Polling for job completion uses exponential backoff for the sleep intervals between polls
11+ ASYNC_JOB_POLL_MIN_INTERVAL = 0.5
12+ ASYNC_JOB_POLL_MAX_INTERVAL = 30
13+ ASYNC_JOB_POLL_BACKOFF_FACTOR = 1.4
814
915class FlowRuns (QuerysetEndpoint ):
1016 def __init__ (self , parent_srv ):
@@ -46,3 +52,37 @@ def cancel(self, flow_run_id):
4652 url = "{0}/{1}" .format (self .baseurl , id_ )
4753 self .put_request (url )
4854 logger .info ("Deleted single flow (ID: {0})" .format (id_ ))
55+
56+
57+ @api (version = "3.10" )
58+ def wait_for_job (self , flow_run_id , * , timeout = None ):
59+ id_ = getattr (flow_run_id , "id" , flow_run_id )
60+ wait_start_time = time .time ()
61+ logger .debug (f"Waiting for job { id_ } " )
62+
63+ current_sleep_interval = ASYNC_JOB_POLL_MIN_INTERVAL
64+ flow_run = self .get_by_id (id_ )
65+ while flow_run .completed_at is None :
66+ max_sleep_time = ASYNC_JOB_POLL_MAX_INTERVAL
67+
68+ if timeout is not None :
69+ elapsed = (time .time () - wait_start_time )
70+ if elapsed >= timeout :
71+ raise TimeoutError (f"Timeout after { elapsed } seconds waiting for asynchronous flow run: { id_ } " )
72+ max_sleep_time = max (ASYNC_JOB_POLL_MIN_INTERVAL , timeout - elapsed )
73+
74+ time .sleep (min (current_sleep_interval , max_sleep_time ))
75+ job = self .get_by_id (id_ )
76+ current_sleep_interval *= ASYNC_JOB_POLL_BACKOFF_FACTOR
77+ logger .debug (f"\t FlowRun { id_ } progress={ flow_run .progress } " )
78+
79+ logger .info ("FlowRun {} Completed: Status: {}" .format (id_ , flow_run .status ))
80+
81+ if flow_run .status == "Success" :
82+ return flow_run
83+ elif flow_run .status == "Failed" :
84+ raise FlowRunFailedException (flow_run )
85+ elif flow_run .finish_code in ["Canceled" , "Cancelled" ]:
86+ raise FlowRunCanceledException (flow_run )
87+ else :
88+ raise AssertionError ("Unexpected status in flow_run" , flow_run )
0 commit comments