Skip to content

Commit

Permalink
Update task engine to increment run_count when entering RUNNING s…
Browse files Browse the repository at this point in the history
…tate (#15436)
  • Loading branch information
desertaxle authored Sep 19, 2024
1 parent e248442 commit ab6c49d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
14 changes: 10 additions & 4 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ def set_state(self, state: State, force: bool = False) -> State:
self.task_run.state_type = new_state.type
self.task_run.state_name = new_state.name

if new_state.is_running():
self.task_run.run_count += 1

if new_state.is_final():
if isinstance(state.data, BaseResult) and state.data.has_cached_object():
# Avoid fetching the result unless it is cached, otherwise we defeat
Expand Down Expand Up @@ -690,8 +693,9 @@ async def wait_until_ready(self):
if scheduled_time := self.state.state_details.scheduled_time:
sleep_time = (scheduled_time - pendulum.now("utc")).total_seconds()
await anyio.sleep(sleep_time if sleep_time > 0 else 0)
new_state = Retrying() if self.state.name == "AwaitingRetry" else Running()
self.set_state(
Retrying() if self.state.name == "AwaitingRetry" else Running(),
new_state,
force=True,
)

Expand Down Expand Up @@ -773,7 +777,6 @@ def call_task_fn(
if transaction.is_committed():
result = transaction.read()
else:
self.task_run.run_count += 1
if self.task_run.tags:
# Acquire a concurrency slot for each tag, but only if a limit
# matching the tag already exists.
Expand Down Expand Up @@ -929,6 +932,9 @@ async def set_state(self, state: State, force: bool = False) -> State:
self.task_run.state_type = new_state.type
self.task_run.state_name = new_state.name

if new_state.is_running():
self.task_run.run_count += 1

if new_state.is_final():
if (
isinstance(new_state.data, BaseResult)
Expand Down Expand Up @@ -1196,8 +1202,9 @@ async def wait_until_ready(self):
if scheduled_time := self.state.state_details.scheduled_time:
sleep_time = (scheduled_time - pendulum.now("utc")).total_seconds()
await anyio.sleep(sleep_time if sleep_time > 0 else 0)
new_state = Retrying() if self.state.name == "AwaitingRetry" else Running()
await self.set_state(
Retrying() if self.state.name == "AwaitingRetry" else Running(),
new_state,
force=True,
)

Expand Down Expand Up @@ -1280,7 +1287,6 @@ async def call_task_fn(
if transaction.is_committed():
result = transaction.read()
else:
self.task_run.run_count += 1
if self.task_run.tags:
# Acquire a concurrency slot for each tag, but only if a limit
# matching the tag already exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def happy_path():
},
"flow_run_run_count": 1,
"name": task_run.name,
"run_count": 0,
"run_count": 1,
"tags": [],
"task_inputs": {},
"total_run_time": 0.0,
Expand Down Expand Up @@ -334,7 +334,7 @@ def happy_path():
},
"flow_run_run_count": 1,
"name": task_run.name,
"run_count": 0,
"run_count": 1,
"tags": [],
"task_inputs": {},
"total_run_time": 0.0,
Expand Down

0 comments on commit ab6c49d

Please sign in to comment.