Skip to content

Update pydantic samples to use contrib module #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ jobs:
# Using fixed Poetry version until
# https://github.com/python-poetry/poetry/pull/7694 is fixed
- run: python -m pip install --upgrade wheel "poetry==1.4.0" poethepoet
- run: poetry install --with pydantic --with dsl --with encryption --with trio_async
- run: poetry install --with pydantic_converter --with dsl --with encryption --with trio_async
- run: poe lint
- run: mkdir junit-xml
- run: poe test -s -o log_cli_level=DEBUG --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
- run: poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
- run: poe test -s --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
# This must remain the last step since it downgrades pydantic
- name: Uninstall pydantic
shell: bash
run: |
echo y | poetry run pip uninstall pydantic
echo y | poetry run pip uninstall pydantic-core
poetry run pip install pydantic==1.10
poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--pydantic-v1.xml tests/pydantic_converter_v1/workflow_test.py

# On latest, run gevent test
- name: Gevent test
Expand Down
2,966 changes: 1,536 additions & 1,430 deletions poetry.lock

Large diffs are not rendered by default.

18 changes: 3 additions & 15 deletions pydantic_converter/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Pydantic Converter Sample

This sample shows how to create a custom Pydantic converter to properly serialize Pydantic models.
This sample shows how to use the Pydantic data converter.

For this sample, the optional `pydantic` dependency group must be included. To include, run:
For this sample, the optional `pydantic_converter` dependency group must be included. To include, run:

poetry install --with pydantic
poetry install --with pydantic_converter

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:
Expand All @@ -17,15 +17,3 @@ This will start the worker. Then, in another terminal, run the following to exec

In the worker terminal, the workflow and its activity will log that it received the Pydantic models. In the starter
terminal, the Pydantic models in the workflow result will be logged.

### Notes

This is the preferred way to use Pydantic models with Temporal Python SDK. The converter code is small and meant to
embed into other projects.

This sample also demonstrates use of `datetime` inside of Pydantic models. Due to a known issue with the Temporal
sandbox, this class is seen by Pydantic as `date` instead of `datetime` upon deserialization. This is due to a
[known Python issue](https://github.com/python/cpython/issues/89010) where, when we proxy the `datetime` class in the
sandbox to prevent non-deterministic calls like `now()`, `issubclass` fails for the proxy type causing Pydantic to think
it's a `date` instead. In `worker.py`, we have shown a workaround of disabling restrictions on `datetime` which solves
this issue but no longer protects against workflow developers making non-deterministic calls in that module.
4 changes: 2 additions & 2 deletions pydantic_converter/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from ipaddress import IPv4Address

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter

from pydantic_converter.converter import pydantic_data_converter
from pydantic_converter.worker import MyPydanticModel, MyWorkflow


Expand All @@ -29,7 +29,7 @@ async def main():
some_date=datetime(2001, 2, 3, 4, 5, 6),
),
],
id=f"pydantic_converter-workflow-id",
id="pydantic_converter-workflow-id",
task_queue="pydantic_converter-task-queue",
)
logging.info("Got models from client: %s" % result)
Expand Down
36 changes: 3 additions & 33 deletions pydantic_converter/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta
from ipaddress import IPv4Address
Expand All @@ -8,17 +7,12 @@
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)

# We always want to pass through external modules to the sandbox that we know
# are safe for workflow use
# Always pass through external modules to the sandbox that you know are safe for
# workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel

from pydantic_converter.converter import pydantic_data_converter
from temporalio.contrib.pydantic import pydantic_data_converter


class MyPydanticModel(BaseModel):
Expand All @@ -42,29 +36,6 @@ async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]:
)


# Due to known issues with Pydantic's use of issubclass and our inability to
# override the check in sandbox, Pydantic will think datetime is actually date
# in the sandbox. At the expense of protecting against datetime.now() use in
# workflows, we're going to remove datetime module restrictions. See sdk-python
# README's discussion of known sandbox issues for more details.
def new_sandbox_runner() -> SandboxedWorkflowRunner:
# TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254
# is fixed and released
invalid_module_member_children = dict(
SandboxRestrictions.invalid_module_members_default.children
)
del invalid_module_member_children["datetime"]
return SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=dataclasses.replace(
SandboxRestrictions.invalid_module_members_default,
children=invalid_module_member_children,
),
)
)


interrupt_event = asyncio.Event()


Expand All @@ -81,7 +52,6 @@ async def main():
task_queue="pydantic_converter-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
Expand Down
31 changes: 31 additions & 0 deletions pydantic_converter_v1/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Pydantic v1 Converter Sample

**This sample shows how to use Pydantic v1 with Temporal. This is not recommended: use Pydantic v2 if possible, and use the
main [pydantic_converter](../pydantic_converter/README.md) sample.**

To install, run:

poetry install --with pydantic_converter
poetry run pip uninstall pydantic pydantic-core
poetry run pip install pydantic==1.10

To run, first see the root [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

In the worker terminal, the workflow and its activity will log that it received the Pydantic models. In the starter
terminal, the Pydantic models in the workflow result will be logged.

### Notes

This sample also demonstrates use of `datetime` inside of Pydantic v1 models. Due to a known issue with the Temporal
sandbox, this class is seen by Pydantic v1 as `date` instead of `datetime` upon deserialization. This is due to a
[known Python issue](https://github.com/python/cpython/issues/89010) where, when we proxy the `datetime` class in the
sandbox to prevent non-deterministic calls like `now()`, `issubclass` fails for the proxy type causing Pydantic v1 to think
it's a `date` instead. In `worker.py`, we have shown a workaround of disabling restrictions on `datetime` which solves
this issue but no longer protects against workflow developers making non-deterministic calls in that module.
Empty file.
File renamed without changes.
39 changes: 39 additions & 0 deletions pydantic_converter_v1/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import logging
from datetime import datetime
from ipaddress import IPv4Address

from temporalio.client import Client

from pydantic_converter_v1.converter import pydantic_data_converter
from pydantic_converter_v1.worker import MyPydanticModel, MyWorkflow


async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)

# Run workflow
result = await client.execute_workflow(
MyWorkflow.run,
[
MyPydanticModel(
some_ip=IPv4Address("127.0.0.1"),
some_date=datetime(2000, 1, 2, 3, 4, 5),
),
MyPydanticModel(
some_ip=IPv4Address("127.0.0.2"),
some_date=datetime(2001, 2, 3, 4, 5, 6),
),
],
id="pydantic_converter-workflow-id",
task_queue="pydantic_converter-task-queue",
)
logging.info("Got models from client: %s" % result)


if __name__ == "__main__":
asyncio.run(main())
98 changes: 98 additions & 0 deletions pydantic_converter_v1/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta
from ipaddress import IPv4Address
from typing import List

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)

# We always want to pass through external modules to the sandbox that we know
# are safe for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel

from pydantic_converter_v1.converter import pydantic_data_converter


class MyPydanticModel(BaseModel):
some_ip: IPv4Address
some_date: datetime


@activity.defn
async def my_activity(models: List[MyPydanticModel]) -> List[MyPydanticModel]:
activity.logger.info("Got models in activity: %s" % models)
return models


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]:
workflow.logger.info("Got models in workflow: %s" % models)
return await workflow.execute_activity(
my_activity, models, start_to_close_timeout=timedelta(minutes=1)
)


# Due to known issues with Pydantic's use of issubclass and our inability to
# override the check in sandbox, Pydantic will think datetime is actually date
# in the sandbox. At the expense of protecting against datetime.now() use in
# workflows, we're going to remove datetime module restrictions. See sdk-python
# README's discussion of known sandbox issues for more details.
def new_sandbox_runner() -> SandboxedWorkflowRunner:
# TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254
# is fixed and released
invalid_module_member_children = dict(
SandboxRestrictions.invalid_module_members_default.children
)
del invalid_module_member_children["datetime"]
return SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=dataclasses.replace(
SandboxRestrictions.invalid_module_members_default,
children=invalid_module_member_children,
),
)
)


interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)

# Run a worker for the workflow
async with Worker(
client,
task_queue="pydantic_converter-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ packages = [

[tool.poetry.dependencies]
python = "^3.9"
temporalio = "^1.9.0"
temporalio = "^1.10.0"

[tool.poetry.dev-dependencies]
black = "^22.3.0"
isort = "^5.10.1"
mypy = "^0.981"
mypy = "^1.4.1"
pytest = "^7.1.2"
pytest-asyncio = "^0.18.3"
frozenlist = "^1.4.0"
types-pyyaml = "^6.0.12.20241230"


# All sample-specific dependencies are in optional groups below, named after the
# sample they apply to
Expand Down Expand Up @@ -63,9 +65,9 @@ optional = true
temporalio = { version = "*", extras = ["opentelemetry"] }
opentelemetry-exporter-otlp-proto-grpc = "1.18.0"

[tool.poetry.group.pydantic]
[tool.poetry.group.pydantic_converter]
optional = true
dependencies = { pydantic = "^1.10.4" }
dependencies = { pydantic = "^2.10.6" }

[tool.poetry.group.sentry]
optional = true
Expand Down
12 changes: 8 additions & 4 deletions sentry/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ async def execute_activity(self, input: ExecuteActivityInput) -> Any:
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.activity.input", asdict(input.args[0]))
if len(input.args) == 1:
[arg] = input.args
if is_dataclass(arg) and not isinstance(arg, type):
set_context("temporal.activity.input", asdict(arg))
set_context("temporal.activity.info", activity.info().__dict__)
capture_exception()
raise e
Expand All @@ -58,8 +60,10 @@ async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.workflow.input", asdict(input.args[0]))
if len(input.args) == 1:
[arg] = input.args
if is_dataclass(arg) and not isinstance(arg, type):
set_context("temporal.workflow.input", asdict(arg))
set_context("temporal.workflow.info", workflow.info().__dict__)

if not workflow.unsafe.is_replaying():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment):

await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)

result = await cluster_manager_handle.result()
assert result.num_currently_assigned_nodes == 0
cluster_manager_result = await cluster_manager_handle.result()
assert cluster_manager_result.num_currently_assigned_nodes == 0


async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
Expand Down
Loading