Skip to content

Patch support and random/UUID helpers #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/worker/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,6 @@ async def _create_workflow_instance(
defn=defn,
info=info,
type_hint_eval_str=self._type_hint_eval_str,
randomness_seed=start.randomness_seed,
)
)
37 changes: 33 additions & 4 deletions temporalio/worker/workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import contextvars
import inspect
import logging
import random
import sys
import traceback
from abc import ABC, abstractmethod
Expand All @@ -26,6 +27,7 @@
NoReturn,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -92,6 +94,7 @@ class WorkflowInstanceDetails:
defn: temporalio.workflow._Definition
info: temporalio.workflow.Info
type_hint_eval_str: bool
randomness_seed: int


class WorkflowInstance(ABC):
Expand Down Expand Up @@ -164,6 +167,11 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
# The actual instance, instantiated on first _run_once
self._object: Any = None
self._is_replaying: bool = False
self._random = random.Random(det.randomness_seed)

# Patches we have been notified of and patches that have been sent
self._patches_notified: Set[str] = set()
self._patches_sent: Set[str] = set()

# We maintain signals and queries on this class since handlers can be
# added during workflow execution
Expand Down Expand Up @@ -293,8 +301,7 @@ def _apply(
elif job.HasField("query_workflow"):
self._apply_query_workflow(job.query_workflow)
elif job.HasField("notify_has_patch"):
# TODO(cretz): This
pass
self._apply_notify_has_patch(job.notify_has_patch)
elif job.HasField("remove_from_cache"):
# Ignore, handled externally
pass
Expand All @@ -321,8 +328,7 @@ def _apply(
elif job.HasField("start_workflow"):
self._apply_start_workflow(job.start_workflow)
elif job.HasField("update_random_seed"):
# TODO(cretz): This
pass
self._apply_update_random_seed(job.update_random_seed)
else:
raise RuntimeError(f"Unrecognized job: {job.WhichOneof('variant')}")

Expand Down Expand Up @@ -391,6 +397,11 @@ async def run_query(input: HandleQueryInput) -> None:
)
)

def _apply_notify_has_patch(
self, job: temporalio.bridge.proto.workflow_activation.NotifyHasPatch
) -> None:
self._patches_notified.add(job.patch_id)

def _apply_resolve_activity(
self, job: temporalio.bridge.proto.workflow_activation.ResolveActivity
) -> None:
Expand Down Expand Up @@ -589,6 +600,11 @@ async def run_workflow(input: ExecuteWorkflowInput) -> None:
self._run_top_level_workflow_function(run_workflow(input))
)

def _apply_update_random_seed(
self, job: temporalio.bridge.proto.workflow_activation.UpdateRandomSeed
) -> None:
self._random.seed(job.randomness_seed)

#### _Runtime direct workflow call overrides ####
# These are in alphabetical order and all start with "workflow_".

Expand Down Expand Up @@ -679,6 +695,19 @@ def workflow_is_replaying(self) -> bool:
def workflow_now(self) -> datetime:
return datetime.utcfromtimestamp(asyncio.get_running_loop().time())

def workflow_patch(self, id: str, *, deprecated: bool) -> bool:
use_patch = not self._is_replaying or id in self._patches_notified
# Only add patch command if never sent before for this ID
if use_patch and not id in self._patches_sent:
command = self._add_command()
command.set_patch_marker.patch_id = id
command.set_patch_marker.deprecated = deprecated
self._patches_sent.add(id)
return use_patch

def workflow_random(self) -> random.Random:
return self._random

def workflow_set_query_handler(
self, name: Optional[str], handler: Optional[Callable]
) -> None:
Expand Down
69 changes: 69 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import asyncio
import inspect
import logging
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import IntEnum
from functools import partial
from random import Random
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -365,6 +367,14 @@ def workflow_is_replaying(self) -> bool:
def workflow_now(self) -> datetime:
...

@abstractmethod
def workflow_patch(self, id: str, *, deprecated: bool) -> bool:
...

@abstractmethod
def workflow_random(self) -> Random:
...

@abstractmethod
def workflow_set_query_handler(
self, name: Optional[str], handler: Optional[Callable]
Expand Down Expand Up @@ -436,6 +446,20 @@ async def workflow_wait_condition(
...


def deprecate_patch(id: str) -> None:
"""Mark a patch as deprecated.

This marks a workflow that had :py:func:`patched` in a previous version of
the code as no longer applicable because all workflows that use the old code
path are done and will never be queried again. Therefore the old code path
is removed as well.
Comment on lines +454 to +455
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary for this PR, but probably makes sense to link to docs page explaining the patch lifecycle

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that doesn't exist for Python yet. https://docs.temporal.io/typescript/patching will be perfect in Python form if/when it exists. I have opened #37 to track this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably be part of the app dev guide


Args:
id: The identifier originally used with :py:func:`patched`.
"""
_Runtime.current().workflow_patch(id, deprecated=True)


def info() -> Info:
"""Current workflow's info.

Expand All @@ -454,6 +478,51 @@ def now() -> datetime:
return _Runtime.current().workflow_now()


def patched(id: str) -> bool:
"""Patch a workflow.

When called, this will only return true if code should take the newer path
which means this is either not replaying or is replaying and has seen this
patch before.

Use :py:func:`deprecate_patch` when all workflows are done and will never be
queried again. The old code path can be used at that time too.

Args:
id: The identifier for this patch. This identifier may be used
repeatedly in the same workflow to represent the same patch

Returns:
True if this should take the newer path, false if it should take the
older path.
"""
return _Runtime.current().workflow_patch(id, deprecated=False)


def random() -> Random:
"""Get a deterministic pseudo-random number generator.

Note, this random number generator is not cryptographically safe and should
not be used for security purposes.

Returns:
The deterministically-seeded pseudo-random number generator.
"""
return _Runtime.current().workflow_random()


def uuid4() -> uuid.UUID:
"""Get a new, determinism-safe v4 UUID based on :py:func:`random`.

Note, this UUID is not cryptographically safe and should not be used for
security purposes.

Returns:
A deterministically-seeded v4 UUID.
"""
return uuid.UUID(bytes=random().getrandbits(16 * 8).to_bytes(16, "big"), version=4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so much better than what I had to do in TS to get random and uuid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw that. This could be even better if I was able to use Random.randbytes() as present in 3.9+.

I hesitated adding these APIs at all because I was hoping a sandbox could provide the random and uuid modules, but we might as well have these available now (and technically we still may offer a no-sandbox approach in the future).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do have sandbox in the future we'll probably reuse these random and uuid implementations



async def wait_condition(
fn: Callable[[], bool], *, timeout: Optional[float] = None
) -> None:
Expand Down
131 changes: 130 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import temporalio.api.common.v1
from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
from temporalio.common import RetryPolicy
from temporalio.converter import DataConverter, PayloadCodec
from temporalio.exceptions import (
Expand Down Expand Up @@ -1245,6 +1245,135 @@ async def test_workflow_child_already_started(client: Client):
assert err.value.cause.message == "Already started"


class PatchWorkflowBase:
def __init__(self) -> None:
self._result = "<unset>"

@workflow.query
def result(self) -> str:
return self._result


@workflow.defn(name="patch-workflow")
class PrePatchWorkflow(PatchWorkflowBase):
@workflow.run
async def run(self) -> None:
self._result = "pre-patch"


@workflow.defn(name="patch-workflow")
class PatchWorkflow(PatchWorkflowBase):
@workflow.run
async def run(self) -> None:
if workflow.patched("my-patch"):
self._result = "post-patch"
else:
self._result = "pre-patch"


@workflow.defn(name="patch-workflow")
class DeprecatePatchWorkflow(PatchWorkflowBase):
@workflow.run
async def run(self) -> None:
workflow.deprecate_patch("my-patch")
self._result = "post-patch"


@workflow.defn(name="patch-workflow")
class PostPatchWorkflow(PatchWorkflowBase):
@workflow.run
async def run(self) -> None:
self._result = "post-patch"


async def test_workflow_patch(client: Client):
workflow_run = PrePatchWorkflow.run
task_queue = str(uuid.uuid4())

async def execute() -> WorkflowHandle:
handle = await client.start_workflow(
workflow_run, id=f"workflow-{uuid.uuid4()}", task_queue=task_queue
)
await handle.result()
return handle

async def query_result(handle: WorkflowHandle) -> str:
return await handle.query(PatchWorkflowBase.result)

# Run a simple pre-patch workflow
async with new_worker(client, PrePatchWorkflow, task_queue=task_queue):
pre_patch_handle = await execute()
assert "pre-patch" == await query_result(pre_patch_handle)

# Confirm patched workflow gives old result for pre-patched but new result
# for patched
async with new_worker(client, PatchWorkflow, task_queue=task_queue):
patch_handle = await execute()
assert "post-patch" == await query_result(patch_handle)
assert "pre-patch" == await query_result(pre_patch_handle)

# Confirm what works during deprecated
async with new_worker(client, DeprecatePatchWorkflow, task_queue=task_queue):
deprecate_patch_handle = await execute()
assert "post-patch" == await query_result(deprecate_patch_handle)
assert "post-patch" == await query_result(patch_handle)

# Confirm what works when deprecation gone
async with new_worker(client, PostPatchWorkflow, task_queue=task_queue):
post_patch_handle = await execute()
assert "post-patch" == await query_result(post_patch_handle)
assert "post-patch" == await query_result(deprecate_patch_handle)
# TODO(cretz): This causes a non-determinism failure due to having the
# patch marker, but we don't have an easy way to test it
# await query_result(patch_handle)


@workflow.defn
class UUIDWorkflow:
def __init__(self) -> None:
self._result = "<unset>"

@workflow.run
async def run(self) -> None:
self._result = str(workflow.uuid4())

@workflow.query
def result(self) -> str:
return self._result


async def test_workflow_uuid(client: Client):
task_queue = str(uuid.uuid4())
async with new_worker(client, UUIDWorkflow, task_queue=task_queue):
# Get two handle UUID results
handle1 = await client.start_workflow(
UUIDWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await handle1.result()
handle1_query_result = await handle1.query(UUIDWorkflow.result)

handle2 = await client.start_workflow(
UUIDWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await handle2.result()
handle2_query_result = await handle2.query(UUIDWorkflow.result)

# Confirm they aren't equal to each other but they are equal to retries
# of the same query
assert handle1_query_result != handle2_query_result
assert handle1_query_result == await handle1.query(UUIDWorkflow.result)
assert handle2_query_result == await handle2.query(UUIDWorkflow.result)

# Now confirm those results are the same even on a new worker
async with new_worker(client, UUIDWorkflow, task_queue=task_queue):
assert handle1_query_result == await handle1.query(UUIDWorkflow.result)
assert handle2_query_result == await handle2.query(UUIDWorkflow.result)


# TODO:
# * Use typed dicts for activity, local activity, and child workflow configs
# * Local activity invalid options
Expand Down