Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class WorkflowActivation(google.protobuf.message.Message):
CONTINUE_AS_NEW_SUGGESTED_FIELD_NUMBER: builtins.int
DEPLOYMENT_VERSION_FOR_CURRENT_TASK_FIELD_NUMBER: builtins.int
LAST_SDK_VERSION_FIELD_NUMBER: builtins.int
SUGGEST_CONTINUE_AS_NEW_REASONS_FIELD_NUMBER: builtins.int
run_id: builtins.str
"""The id of the currently active run of the workflow. Also used as a cache key. There may
only ever be one active workflow task (and hence activation) of a run at one time.
Expand Down Expand Up @@ -139,6 +140,16 @@ class WorkflowActivation(google.protobuf.message.Message):
"""
last_sdk_version: builtins.str
"""The last seen SDK version from the most recent WFT completed event"""
@property
def suggest_continue_as_new_reasons(
self,
) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[
temporalio.api.enums.v1.workflow_pb2.SuggestContinueAsNewReason.ValueType
]:
"""Experimental. Optionally decide the versioning behavior that the first task of the new run should use.
For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version
of the previous run.
"""
def __init__(
self,
*,
Expand All @@ -153,6 +164,10 @@ class WorkflowActivation(google.protobuf.message.Message):
deployment_version_for_current_task: temporalio.bridge.proto.common.common_pb2.WorkerDeploymentVersion
| None = ...,
last_sdk_version: builtins.str = ...,
suggest_continue_as_new_reasons: collections.abc.Iterable[
temporalio.api.enums.v1.workflow_pb2.SuggestContinueAsNewReason.ValueType
]
| None = ...,
) -> None: ...
def HasField(
self,
Expand Down Expand Up @@ -184,6 +199,8 @@ class WorkflowActivation(google.protobuf.message.Message):
b"last_sdk_version",
"run_id",
b"run_id",
"suggest_continue_as_new_reasons",
b"suggest_continue_as_new_reasons",
"timestamp",
b"timestamp",
],
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,7 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
SEARCH_ATTRIBUTES_FIELD_NUMBER: builtins.int
RETRY_POLICY_FIELD_NUMBER: builtins.int
VERSIONING_INTENT_FIELD_NUMBER: builtins.int
INITIAL_VERSIONING_BEHAVIOR_FIELD_NUMBER: builtins.int
workflow_type: builtins.str
"""The identifier the lang-specific sdk uses to execute workflow code"""
task_queue: builtins.str
Expand Down Expand Up @@ -1016,6 +1017,13 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
temporalio.bridge.proto.common.common_pb2.VersioningIntent.ValueType
)
"""Whether the continued workflow should run on a worker with a compatible build id or not."""
initial_versioning_behavior: (
temporalio.api.enums.v1.workflow_pb2.ContinueAsNewVersioningBehavior.ValueType
)
"""Experimental. Optionally decide the versioning behavior that the first task of the new run should use.
For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version
of the previous run.
"""
def __init__(
self,
*,
Expand All @@ -1041,6 +1049,7 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
| None = ...,
retry_policy: temporalio.api.common.v1.message_pb2.RetryPolicy | None = ...,
versioning_intent: temporalio.bridge.proto.common.common_pb2.VersioningIntent.ValueType = ...,
initial_versioning_behavior: temporalio.api.enums.v1.workflow_pb2.ContinueAsNewVersioningBehavior.ValueType = ...,
) -> None: ...
def HasField(
self,
Expand All @@ -1060,6 +1069,8 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
b"arguments",
"headers",
b"headers",
"initial_versioning_behavior",
b"initial_versioning_behavior",
"memo",
b"memo",
"retry_policy",
Expand Down
3 changes: 2 additions & 1 deletion temporalio/worker/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import temporalio.nexus
import temporalio.nexus._util
import temporalio.workflow
from temporalio.workflow import VersioningIntent
from temporalio.workflow import ContinueAsNewVersioningBehavior, VersioningIntent


class Interceptor:
Expand Down Expand Up @@ -172,6 +172,7 @@ class ContinueAsNewInput:
)
headers: Mapping[str, temporalio.api.common.v1.Payload]
versioning_intent: VersioningIntent | None
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None
# The types may be absent
arg_types: list[type] | None

Expand Down
20 changes: 20 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._current_history_length = 0
self._current_history_size = 0
self._continue_as_new_suggested = False
self._suggested_continue_as_new_reasons: Sequence[
temporalio.api.enums.v1.SuggestContinueAsNewReason.ValueType
] = []
# Lazily loaded
self._untyped_converted_memo: MutableMapping[str, Any] | None = None
# Handles which are ready to run on the next event loop iteration
Expand Down Expand Up @@ -403,6 +406,7 @@ def activate(
self._current_history_length = act.history_length
self._current_history_size = act.history_size_bytes
self._continue_as_new_suggested = act.continue_as_new_suggested
self._suggested_continue_as_new_reasons = act.suggest_continue_as_new_reasons
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm from a Core side that once continue as new reasons are set on a workflow, they come over for every activation after that? (also worth confirming this for the suggested bool)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test (very similar to the test_workflow_history_info we were discussing) to check that CAN suggestion & reasons persist between WFTs

self._time_ns = act.timestamp.ToNanoseconds()
self._is_replaying = act.is_replaying
self._current_thread_id = threading.get_ident()
Expand Down Expand Up @@ -1120,6 +1124,8 @@ def workflow_continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
),
versioning_intent: temporalio.workflow.VersioningIntent | None,
initial_versioning_behavior: temporalio.workflow.ContinueAsNewVersioningBehavior
| None,
) -> NoReturn:
self._assert_not_read_only("continue as new")
# Use definition if callable
Expand Down Expand Up @@ -1147,6 +1153,7 @@ def workflow_continue_as_new(
headers={},
arg_types=arg_types,
versioning_intent=versioning_intent,
initial_versioning_behavior=initial_versioning_behavior,
)
)

Expand Down Expand Up @@ -1216,6 +1223,14 @@ def workflow_instance(self) -> Any:
def workflow_is_continue_as_new_suggested(self) -> bool:
return self._continue_as_new_suggested

def workflow_get_suggested_continue_as_new_reasons(
self,
) -> Sequence[temporalio.workflow.SuggestContinueAsNewReason]:
return [
temporalio.workflow.SuggestContinueAsNewReason(r)
for r in self._suggested_continue_as_new_reasons
]

def workflow_is_replaying(self) -> bool:
return self._is_replaying

Expand Down Expand Up @@ -3388,6 +3403,11 @@ def _apply_command(self) -> None:
)
if self._input.versioning_intent:
v.versioning_intent = self._input.versioning_intent._to_proto()
if self._input.initial_versioning_behavior:
v.initial_versioning_behavior = cast(
"temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.ValueType",
int(self._input.initial_versioning_behavior),
)


def _encode_search_attributes(
Expand Down
76 changes: 76 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
)

import temporalio.api.common.v1
import temporalio.api.enums
import temporalio.api.enums.v1
import temporalio.bridge.proto.child_workflow
import temporalio.bridge.proto.common
import temporalio.bridge.proto.nexus
Expand Down Expand Up @@ -604,6 +606,12 @@ def is_continue_as_new_suggested(self) -> bool:
"""
return _Runtime.current().workflow_is_continue_as_new_suggested()

def get_suggested_continue_as_new_reasons(
self,
) -> Sequence[SuggestContinueAsNewReason]:
"""Get reason(s) why continue as new is suggested"""
return _Runtime.current().workflow_get_suggested_continue_as_new_reasons()


@dataclass(frozen=True)
class ParentInfo:
Expand Down Expand Up @@ -693,6 +701,7 @@ def workflow_continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
),
versioning_intent: VersioningIntent | None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None,
) -> NoReturn: ...

@abstractmethod
Expand Down Expand Up @@ -738,6 +747,9 @@ def workflow_instance(self) -> Any: ...
@abstractmethod
def workflow_is_continue_as_new_suggested(self) -> bool: ...

@abstractmethod
def workflow_get_suggested_continue_as_new_reasons(self) -> Sequence[Any]: ...

@abstractmethod
def workflow_is_replaying(self) -> bool: ...

Expand Down Expand Up @@ -4733,6 +4745,7 @@ def continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
) = None,
versioning_intent: VersioningIntent | None = None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
) -> NoReturn: ...


Expand All @@ -4751,6 +4764,7 @@ def continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
) = None,
versioning_intent: VersioningIntent | None = None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
) -> NoReturn: ...


Expand All @@ -4770,6 +4784,7 @@ def continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
) = None,
versioning_intent: VersioningIntent | None = None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
) -> NoReturn: ...


Expand All @@ -4789,6 +4804,7 @@ def continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
) = None,
versioning_intent: VersioningIntent | None = None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
) -> NoReturn: ...


Expand All @@ -4808,6 +4824,7 @@ def continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
) = None,
versioning_intent: VersioningIntent | None = None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
) -> NoReturn: ...


Expand All @@ -4826,6 +4843,7 @@ def continue_as_new(
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
) = None,
versioning_intent: VersioningIntent | None = None,
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
) -> NoReturn:
"""Stop the workflow immediately and continue as new.

Expand Down Expand Up @@ -4867,6 +4885,7 @@ def continue_as_new(
memo=memo,
search_attributes=search_attributes,
versioning_intent=versioning_intent,
initial_versioning_behavior=initial_versioning_behavior,
)


Expand Down Expand Up @@ -5283,6 +5302,63 @@ def _to_proto(self) -> temporalio.bridge.proto.common.VersioningIntent.ValueType
return temporalio.bridge.proto.common.VersioningIntent.UNSPECIFIED


class ContinueAsNewVersioningBehavior(IntEnum):
"""Experimental. Optionally decide the versioning behavior that the first task of the new run should use.
For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version
of the previous run.
"""

UNSPECIFIED = int(
temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
)
"""An initial versioning behavior is not set, follow the existing continue-as-new inheritance semantics.
See https://docs.temporal.io/worker-versioning#inheritance-semantics for more detail.
"""

AUTO_UPGRADE = int(
temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
)
"""Start the new run with AutoUpgrade behavior. Use the Target Version of the workflow's task queue at
start-time, as AutoUpgrade workflows do. After the first workflow task completes, use whatever
Versioning Behavior the workflow is annotated with in the workflow code.

Note that if the previous workflow had a Pinned override, that override will be inherited by the
new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
command. If a Pinned override is inherited by the new run, and the new run starts with AutoUpgrade
behavior, the base version of the new run will be the Target Version as described above, but the
effective version will be whatever is specified by the Versioning Override until the override is removed.
"""


class SuggestContinueAsNewReason(IntEnum):
"""SuggestContinueAsNewReason specifies a reason why continue as new is true."""

UNSPECIFIED = int(
temporalio.api.enums.v1.SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_UNSPECIFIED
)
"""Continue as new is suggested but the reason is unknown"""

HISTORY_SIZE_TOO_LARGE = int(
temporalio.api.enums.v1.SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE
)
"""Workflow History size is getting too large."""

TOO_MANY_HISTORY_EVENTS = int(
temporalio.api.enums.v1.SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS
)
"""Workflow History event count is getting too large."""

TOO_MANY_UPDATES = int(
temporalio.api.enums.v1.SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES
)
"""Workflow's count of completed plus in-flight updates is too large."""

TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED = int(
temporalio.api.enums.v1.SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED
)
"""Workflow's Target Worker Deployment Version is different from its current Version and the workflow is versioned."""


ServiceT = TypeVar("ServiceT")


Expand Down
Loading
Loading