Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1013,19 +1013,35 @@ affect calls activity code might make to functions on the `temporalio.activity`
### Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
assuming `history_json_str` is populated with a JSON string history either exported from the web UI or from `tctl`, the
assuming `history_str` is populated with a JSON string history either exported from the web UI or from `tctl`, the
following function will replay it:

```python
from temporalio.client import WorkflowHistory
from temporalio.worker import Replayer

async def run_replayer(history_json_str: str):
async def run_replayer(history_str: str):
replayer = Replayer(workflows=[SayHello])
await replayer.replay_workflow(history_json_str)
await replayer.replay_workflow(WorkflowHistory.from_json(history_str))
```

This will throw an error if any non-determinism is detected.

Replaying from workflow history is a powerful concept that many use to test that workflow alterations won't cause
non-determinisms with past-complete workflows. The following code will make sure that all workflow histories for a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
non-determinisms with past-complete workflows. The following code will make sure that all workflow histories for a
non-determinism errors with past-complete workflows. The following code will make sure that all workflow histories for a

certain workflow type (i.e. workflow class) are safe with the current code.

```python
from temporalio.client import Client, WorkflowHistory
from temporalio.worker import Replayer

async def check_past_histories(my_client: Client):
replayer = Replayer(workflows=[SayHello])
await replayer.replay_workflows(
await my_client.list_workflows("WorkflowType = 'SayHello'").map_histories(),
)
```

### OpenTelemetry Support

OpenTelemetry support requires the optional `opentelemetry` dependencies which are part of the `opentelemetry` extra.
Expand Down
13 changes: 13 additions & 0 deletions scripts/_proto/temporal/api/common/v1/grpc_status.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package temporal.api.common.v1;

import "google/protobuf/any.proto";

// From https://github.com/grpc/grpc/blob/master/src/proto/grpc/status/status.proto
// since we don't import grpc but still need the status info
message GrpcStatus {
int32 code = 1;
string message = 2;
repeated google.protobuf.Any details = 3;
}
14 changes: 11 additions & 3 deletions scripts/gen_protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
health_proto_dir = proto_dir / "grpc"
testsrv_proto_dir = proto_dir / "testsrv_upstream"
test_proto_dir = base_dir / "tests"
additional_proto_dir = base_dir / "scripts" / "_proto"

# Exclude testsrv dependencies protos
proto_paths = [
Expand All @@ -23,6 +24,7 @@
if not str(v).startswith(str(testsrv_proto_dir / "dependencies"))
]
proto_paths.extend(test_proto_dir.glob("**/*.proto"))
proto_paths.extend(additional_proto_dir.glob("**/*.proto"))

api_out_dir = base_dir / "temporalio" / "api"
sdk_out_dir = base_dir / "temporalio" / "bridge" / "proto"
Expand Down Expand Up @@ -150,16 +152,22 @@ def fix_generated_output(base_path: Path):
f"--proto_path={testsrv_proto_dir}",
f"--proto_path={health_proto_dir}",
f"--proto_path={test_proto_dir}",
f"--proto_path={additional_proto_dir}",
f"--python_out={temp_dir}",
f"--grpc_python_out={temp_dir}",
f"--mypy_out={temp_dir}",
f"--mypy_grpc_out={temp_dir}",
*map(str, proto_paths),
]
)
# Remove health gRPC parts
(temp_dir / "health" / "v1" / "health_pb2_grpc.py").unlink()
(temp_dir / "health" / "v1" / "health_pb2_grpc.pyi").unlink()
# Remove every _grpc.py file that isn't part of a Temporal "service"
for grpc_file in temp_dir.glob("**/*_grpc.py*"):
if (
len(grpc_file.parents) < 2
or grpc_file.parents[0].name != "v1"
or not grpc_file.parents[1].name.endswith("service")
):
grpc_file.unlink()
# Apply fixes before moving code
fix_generated_output(temp_dir)
# Move protos
Expand Down
3 changes: 0 additions & 3 deletions temporalio/api/batch/v1/message_pb2_grpc.py

This file was deleted.

25 changes: 0 additions & 25 deletions temporalio/api/batch/v1/message_pb2_grpc.pyi

This file was deleted.

3 changes: 0 additions & 3 deletions temporalio/api/command/v1/message_pb2_grpc.py

This file was deleted.

25 changes: 0 additions & 25 deletions temporalio/api/command/v1/message_pb2_grpc.pyi

This file was deleted.

2 changes: 2 additions & 0 deletions temporalio/api/common/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .grpc_status_pb2 import GrpcStatus
from .message_pb2 import (
ActivityType,
DataBlob,
Expand All @@ -14,6 +15,7 @@
__all__ = [
"ActivityType",
"DataBlob",
"GrpcStatus",
"Header",
"Memo",
"Payload",
Expand Down
40 changes: 40 additions & 0 deletions temporalio/api/common/v1/grpc_status_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions temporalio/api/common/v1/grpc_status_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
"""
import builtins
import collections.abc
import google.protobuf.any_pb2
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.message
import sys

if sys.version_info >= (3, 8):
import typing as typing_extensions
else:
import typing_extensions

DESCRIPTOR: google.protobuf.descriptor.FileDescriptor

class GrpcStatus(google.protobuf.message.Message):
"""From https://github.com/grpc/grpc/blob/master/src/proto/grpc/status/status.proto
since we don't import grpc but still need the status info
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

CODE_FIELD_NUMBER: builtins.int
MESSAGE_FIELD_NUMBER: builtins.int
DETAILS_FIELD_NUMBER: builtins.int
code: builtins.int
message: builtins.str
@property
def details(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
google.protobuf.any_pb2.Any
]: ...
def __init__(
self,
*,
code: builtins.int = ...,
message: builtins.str = ...,
details: collections.abc.Iterable[google.protobuf.any_pb2.Any] | None = ...,
) -> None: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"code", b"code", "details", b"details", "message", b"message"
],
) -> None: ...

global___GrpcStatus = GrpcStatus
3 changes: 0 additions & 3 deletions temporalio/api/common/v1/message_pb2_grpc.py

This file was deleted.

25 changes: 0 additions & 25 deletions temporalio/api/common/v1/message_pb2_grpc.pyi

This file was deleted.

3 changes: 0 additions & 3 deletions temporalio/api/dependencies/gogoproto/gogo_pb2_grpc.py

This file was deleted.

31 changes: 0 additions & 31 deletions temporalio/api/dependencies/gogoproto/gogo_pb2_grpc.pyi

This file was deleted.

3 changes: 0 additions & 3 deletions temporalio/api/enums/v1/batch_operation_pb2_grpc.py

This file was deleted.

25 changes: 0 additions & 25 deletions temporalio/api/enums/v1/batch_operation_pb2_grpc.pyi

This file was deleted.

3 changes: 0 additions & 3 deletions temporalio/api/enums/v1/command_type_pb2_grpc.py

This file was deleted.

25 changes: 0 additions & 25 deletions temporalio/api/enums/v1/command_type_pb2_grpc.pyi

This file was deleted.

3 changes: 0 additions & 3 deletions temporalio/api/enums/v1/common_pb2_grpc.py

This file was deleted.

Loading