Skip to content

Test Framework #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,122 @@ While running in a workflow, in addition to features documented elsewhere, the f
* `await handle.signal()` can be called on the handle to signal the external workflow
* `await handle.cancel()` can be called on the handle to send a cancel to the external workflow

#### Testing

Workflow testing can be done in an integration-test fashion against a real server, however it is hard to simulate
timeouts and other long time-based code. Using the time-skipping workflow test environment can help there.

The time-skipping `temporalio.testing.WorkflowEnvironment` can be created via the static async `start_time_skipping()`.
This internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,
then starts the test server which has special APIs for skipping time.

##### Automatic Time Skipping

Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. To
manually advance time before waiting on the result of a workflow, the `WorkflowEnvironment.sleep` method can be used.

Here's a simple example of a workflow that sleeps for 24 hours:

```python
import asyncio
from temporalio import workflow

@workflow.defn
class WaitADayWorkflow:
@workflow.run
async def run(self) -> str:
await asyncio.sleep(24 * 60 * 60)
return "all done"
```

An integration test of this workflow would be way too slow. However the time-skipping server automatically skips to the
next event when we wait on the result. Here's a test for that workflow:

```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

async def test_wait_a_day_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[WaitADayWorkflow]):
assert "all done" == await env.client.execute_workflow(WaitADayWorkflow.run, id="wf1", task_queue="tq1")
```

That test will run almost instantly. This is because by calling `execute_workflow` on our client, we have asked the
environment to automatically skip time as much as it can (basically until the end of the workflow or until an activity
is run).

To disable automatic time-skipping while waiting for a workflow result, run code inside a
`with env.auto_time_skipping_disabled():` block.

##### Manual Time Skipping

Until a workflow is waited on, all time skipping in the time-skipping environment is done manually via
`WorkflowEnvironment.sleep`.

Here's workflow that waits for a signal or times out:

```python
import asyncio
from temporalio import workflow

@workflow.defn
class SignalWorkflow:
def __init__(self) -> None:
self.signal_received = False

@workflow.run
async def run(self) -> str:
# Wait for signal or timeout in 45 seconds
try:
await workflow.wait_condition(lambda: self.signal_received, timeout=45)
return "got signal"
except asyncio.TimeoutError:
return "got timeout"

@workflow.signal
def some_signal(self) -> None:
self.signal_received = True
```

To test a normal signal, you might:

```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

async def test_signal_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, send signal, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await handle.signal(SignalWorkflow.some_signal)
assert "got signal" == await handle.result()
```

But how would you test the timeout part? Like so:

```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

async def test_signal_workflow_timeout():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, advance time past timeout, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await env.sleep(50)
assert "got timeout" == await handle.result()
```

Also, the current time of the workflow environment can be obtained via the async `WorkflowEnvironment.get_current_time`
method.

##### Mocking Activities

Activities are just functions decorated with `@activity.defn`. Simply write different ones and pass those to the worker
to have different activities called during the test.

### Activities

#### Definition
Expand Down Expand Up @@ -517,6 +633,11 @@ Cancellation for synchronous activities is done in the background and the activi
react appropriately. An activity must heartbeat to receive cancellation and there are other ways to be notified about
cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).

Note, all calls from an activity to functions in the `temporalio.activity` package are powered by
[contextvars](https://docs.python.org/3/library/contextvars.html). Therefore, new threads starting _inside_ of
activities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still
function in the new threads.

###### Synchronous Multithreaded Activities

If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
Expand Down Expand Up @@ -580,6 +701,18 @@ cancellation of all outstanding activities.
The `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least
respect cancellation, the shutdown may never complete.

#### Testing

Unit testing an activity or any code that could run in an activity is done via the
`temporalio.testing.ActivityEnvironment` class. Simply instantiate this and any callable + params passed to `run` will
be invoked inside the activity context. The following are attributes/methods on the environment that can be used to
affect calls activity code might make to functions on the `temporalio.activity` package.

* `info` property can be set to customize what is returned from `activity.info()`
* `on_heartbeat` property can be set to handle `activity.heartbeat()` calls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to mention that when a worker runs an activity it throttles sending out heartbeats to the server so that users don't rely on this function in their tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can document, but can you clarify what you mean by:

users don't rely on this function in their tests

This is the one literally invoked for each heartbeat call unthrottled and I'd expect they can rely on that. Or are you saying so they don't improperly assume every heartbeat is heavy and that people call it quickly a lot so on_heartbeat should not be assumed to be 1:1 with server heartbeat?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the latter, thanks for clarifying.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will document when I send in PR for #120

* `cancel()` can be invoked to simulate a cancellation of the activity
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity

### Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
Expand Down
67 changes: 48 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pydoctor = { git = "https://github.com/cretz/pydoctor.git", branch = "overloads"
pytest = "^7.1.2"
pytest-asyncio = "^0.18.3"
pytest-timeout = "^2.1.0"
setuptools = "^64.0.1"
setuptools = "^65.0.0"
setuptools-rust = "^1.3.0"
toml = "^0.10.2"
twine = "^4.0.1"
Expand Down Expand Up @@ -149,6 +149,8 @@ intersphinx = [
privacy = [
"PRIVATE:temporalio.bridge",
"PRIVATE:temporalio.types",
"HIDDEN:temporalio.testing.activity",
"HIDDEN:temporalio.testing.workflow",
"HIDDEN:temporalio.worker.activity",
"HIDDEN:temporalio.worker.interceptor",
"HIDDEN:temporalio.worker.worker",
Expand Down
9 changes: 6 additions & 3 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
overload,
)

import temporalio.api.common.v1
import temporalio.common
import temporalio.exceptions

Expand Down Expand Up @@ -130,8 +129,12 @@ def current() -> _Context:
return context

@staticmethod
def set(context: _Context) -> None:
_current_context.set(context)
def set(context: _Context) -> contextvars.Token:
return _current_context.set(context)

@staticmethod
def reset(token: contextvars.Token) -> None:
_current_context.reset(token)

@property
def logger_details(self) -> Mapping[str, Any]:
Expand Down
Loading