Skip to content

Expose root workflow execution #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,12 @@ class WorkflowExecution:
parent_run_id: Optional[str]
"""Run ID for the parent workflow if this was started as a child."""

root_id: Optional[str]
"""ID for the root workflow."""

root_run_id: Optional[str]
"""Run ID for the root workflow."""

raw_info: temporalio.api.workflow.v1.WorkflowExecutionInfo
"""Underlying protobuf info."""

Expand Down Expand Up @@ -2828,6 +2834,12 @@ def _from_raw_info(
parent_run_id=info.parent_execution.run_id
if info.HasField("parent_execution")
else None,
root_id=info.root_execution.workflow_id
if info.HasField("root_execution")
else None,
root_run_id=info.root_execution.run_id
if info.HasField("root_execution")
else None,
raw_info=info,
run_id=info.execution.run_id,
search_attributes=temporalio.converter.decode_search_attributes(
Expand Down
7 changes: 7 additions & 0 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,18 @@ def _create_workflow_instance(

# Build info
parent: Optional[temporalio.workflow.ParentInfo] = None
root: Optional[temporalio.workflow.RootInfo] = None
if init.HasField("parent_workflow_info"):
parent = temporalio.workflow.ParentInfo(
namespace=init.parent_workflow_info.namespace,
run_id=init.parent_workflow_info.run_id,
workflow_id=init.parent_workflow_info.workflow_id,
)
if init.HasField("root_workflow"):
root = temporalio.workflow.RootInfo(
run_id=init.root_workflow.run_id,
workflow_id=init.root_workflow.workflow_id,
)
info = temporalio.workflow.Info(
attempt=init.attempt,
continued_run_id=init.continued_from_execution_run_id or None,
Expand All @@ -385,6 +391,7 @@ def _create_workflow_instance(
headers=dict(init.headers),
namespace=self._namespace,
parent=parent,
root=root,
raw_memo=dict(init.memo.fields),
retry_policy=temporalio.common.RetryPolicy.from_proto(init.retry_policy)
if init.HasField("retry_policy")
Expand Down
1 change: 1 addition & 0 deletions temporalio/worker/workflow_sandbox/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
headers={},
namespace="sandbox-validate-namespace",
parent=None,
root=None,
raw_memo={},
retry_policy=None,
run_id="sandbox-validate-run_id",
Expand Down
9 changes: 9 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ class Info:
headers: Mapping[str, temporalio.api.common.v1.Payload]
namespace: str
parent: Optional[ParentInfo]
root: Optional[RootInfo]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test that confirms this is populated (and that it is populated on result of handle.describe() from client side)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test in test_workflow and added an assertion to a describe test in test_client

priority: temporalio.common.Priority
"""The priority of this workflow execution. If not set, or this server predates priorities,
then returns a default instance."""
Expand Down Expand Up @@ -518,6 +519,14 @@ class ParentInfo:
workflow_id: str


@dataclass(frozen=True)
class RootInfo:
"""Information about the root workflow."""

run_id: str
workflow_id: str


@dataclass(frozen=True)
class UpdateInfo:
"""Information about a workflow update."""
Expand Down
37 changes: 35 additions & 2 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import uuid
from contextlib import closing
from datetime import timedelta
from typing import Awaitable, Callable, Optional, Sequence, Type, TypeVar
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union

from temporalio.api.common.v1 import WorkflowExecution
from temporalio.api.enums.v1 import IndexedValueType
Expand All @@ -19,7 +19,9 @@
from temporalio.service import RPCError, RPCStatusCode
from temporalio.worker import Worker, WorkflowRunner
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
from temporalio.workflow import UpdateMethodMultiParam
from temporalio.workflow import (
UpdateMethodMultiParam,
)


def new_worker(
Expand Down Expand Up @@ -150,3 +152,34 @@ async def admitted_update_task(
lambda: workflow_update_exists(client, handle.id, id),
)
return update_task


async def assert_workflow_exists_eventually(
client: Client,
workflow: Any,
workflow_id: str,
) -> WorkflowHandle:
handle = None

async def check_workflow_exists() -> bool:
nonlocal handle
try:
handle = client.get_workflow_handle_for(
workflow,
workflow_id=workflow_id,
)
await handle.describe()
return True
except RPCError as err:
# Ignore not-found or failed precondition because child may
# not have started yet
if (
err.status == RPCStatusCode.NOT_FOUND
or err.status == RPCStatusCode.FAILED_PRECONDITION
):
return False
raise

await assert_eq_eventually(True, check_workflow_exists)
assert handle is not None
return handle
2 changes: 2 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ async def test_describe(
assert desc.status == WorkflowExecutionStatus.COMPLETED
assert desc.task_queue == worker.task_queue
assert desc.workflow_type == "kitchen_sink"
assert desc.root_id == desc.id
assert desc.root_run_id == desc.run_id


async def test_query(client: Client, worker: ExternalWorker):
Expand Down
86 changes: 66 additions & 20 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
from tests.helpers import (
admitted_update_task,
assert_eq_eventually,
assert_workflow_exists_eventually,
ensure_search_attributes_present,
find_free_port,
new_worker,
Expand Down Expand Up @@ -1126,26 +1127,9 @@ async def test_workflow_cancel_child_started(client: Client, use_execute: bool):
task_queue=worker.task_queue,
)

# Wait until child started
async def child_started() -> bool:
try:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, # type: ignore[arg-type]
workflow_id=f"{handle.id}_child",
).query(LongSleepWorkflow.started)
except RPCError as err:
# Ignore not-found or failed precondition because child may
# not have started yet
if (
err.status == RPCStatusCode.NOT_FOUND
or err.status == RPCStatusCode.FAILED_PRECONDITION
):
return False
raise

await assert_eq_eventually(True, child_started)
await assert_workflow_exists_eventually(
client, LongSleepWorkflow.run, f"{handle.id}_child"
)
# Send cancel signal and wait on the handle
await handle.signal(CancelChildWorkflow.cancel_child)
with pytest.raises(WorkflowFailureError) as err:
Expand Down Expand Up @@ -7081,3 +7065,65 @@ async def test_workflow_priorities(client: Client, env: WorkflowEnvironment):
task_queue=worker.task_queue,
)
await handle.result()


@workflow.defn
class ExposeRootChildWorkflow:
def __init__(self) -> None:
self.blocked = True

@workflow.signal
def unblock(self) -> None:
self.blocked = False

@workflow.run
async def run(self) -> Optional[temporalio.workflow.RootInfo]:
await workflow.wait_condition(lambda: not self.blocked)
return workflow.info().root


@workflow.defn
class ExposeRootWorkflow:
@workflow.run
async def run(self, child_wf_id) -> Optional[temporalio.workflow.RootInfo]:
return await workflow.execute_child_workflow(
ExposeRootChildWorkflow.run, id=child_wf_id
)


async def test_expose_root_execution(client: Client, env: WorkflowEnvironment):
if env.supports_time_skipping:
pytest.skip(
"Java test server needs release with: https://github.com/temporalio/sdk-java/pull/2441"
)
async with new_worker(
client, ExposeRootWorkflow, ExposeRootChildWorkflow
) as worker:
parent_wf_id = f"workflow-{uuid.uuid4()}"
child_wf_id = parent_wf_id + "_child"
handle = await client.start_workflow(
ExposeRootWorkflow.run,
child_wf_id,
id=parent_wf_id,
task_queue=worker.task_queue,
)

await assert_workflow_exists_eventually(
client, ExposeRootChildWorkflow, child_wf_id
)
child_handle: WorkflowHandle = client.get_workflow_handle_for(
ExposeRootChildWorkflow.run, child_wf_id
)
child_desc = await child_handle.describe()
parent_desc = await handle.describe()
# Assert child root execution is the same as it's parent execution
assert child_desc.root_id == parent_desc.id
assert child_desc.root_run_id == parent_desc.run_id
# Unblock child
await child_handle.signal(ExposeRootChildWorkflow.unblock)
# Get the result (child info)
child_wf_info_root = await handle.result()
# Assert root execution in child info is same as it's parent execution
assert child_wf_info_root is not None
assert child_wf_info_root.workflow_id == parent_desc.id
assert child_wf_info_root.run_id == parent_desc.run_id
Loading