Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions temporalio/worker/workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._info = det.info
self._primary_task: Optional[asyncio.Task[None]] = None
self._time = 0.0
self._current_history_length = 0
# Handles which are ready to run on the next event loop iteration
self._ready: Deque[asyncio.Handle] = collections.deque()
self._conditions: List[Tuple[Callable[[], bool], asyncio.Future]] = []
Expand Down Expand Up @@ -231,6 +232,7 @@ def activate(
)
self._current_completion.successful.SetInParent()
self._current_activation_error: Optional[Exception] = None
self._current_history_length = act.history_length
self._time = act.timestamp.ToMicroseconds() / 1e6
self._is_replaying = act.is_replaying

Expand Down Expand Up @@ -652,6 +654,9 @@ def workflow_continue_as_new(
# TODO(cretz): Why can't MyPy infer the above never returns?
raise RuntimeError("Unreachable")

def workflow_get_current_history_length(self) -> int:
return self._current_history_length

def workflow_get_external_workflow_handle(
self, id: str, *, run_id: Optional[str]
) -> temporalio.workflow.ExternalWorkflowHandle[Any]:
Expand Down
12 changes: 12 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ def _logger_details(self) -> Mapping[str, Any]:
"workflow_type": self.workflow_type,
}

def get_current_history_length(self) -> int:
"""Get the current number of events in history.

Returns:
Current number of events in history (up until the current task).
"""
return _Runtime.current().workflow_get_current_history_length()


@dataclass(frozen=True)
class ParentInfo:
Expand Down Expand Up @@ -347,6 +355,10 @@ def workflow_continue_as_new(
) -> NoReturn:
...

@abstractmethod
def workflow_get_current_history_length(self) -> int:
...

@abstractmethod
def workflow_get_external_workflow_handle(
self, id: str, *, run_id: Optional[str]
Expand Down
5 changes: 4 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ class InfoWorkflow:
@workflow.run
async def run(self) -> Dict:
# Convert to JSON and back so it'll stringify un-JSON-able pieces
return json.loads(json.dumps(dataclasses.asdict(workflow.info()), default=str))
ret = dataclasses.asdict(workflow.info())
ret["current_history_length"] = workflow.info().get_current_history_length()
return json.loads(json.dumps(ret, default=str))


async def test_workflow_info(client: Client):
Expand All @@ -130,6 +132,7 @@ async def test_workflow_info(client: Client):
)
assert info["attempt"] == 1
assert info["cron_schedule"] is None
assert info["current_history_length"] == 3
assert info["execution_timeout"] is None
assert info["namespace"] == client.namespace
assert info["retry_policy"] == json.loads(
Expand Down