Skip to content

Commit e0d0a25

Browse files
authored
Merge branch 'main' into plugins_to_core
2 parents 3b1956e + 6d2924b commit e0d0a25

File tree

9 files changed

+287
-211
lines changed

9 files changed

+287
-211
lines changed

README.md

Lines changed: 106 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,11 @@ informal introduction to the features and their implementation.
9999
- [Interceptors](#interceptors)
100100
- [Nexus](#nexus)
101101
- [Plugins](#plugins)
102-
- [Client Plugins](#client-plugins)
103-
- [Worker Plugins](#worker-plugins)
102+
- [Usage](#usage-1)
103+
- [Plugin Implementations](#plugin-implementations)
104+
- [Advanced Plugin Implementations](#advanced-plugin-implementations)
105+
- [Client Plugins](#client-plugins)
106+
- [Worker Plugins](#worker-plugins)
104107
- [Workflow Replay](#workflow-replay)
105108
- [Observability](#observability)
106109
- [Metrics](#metrics)
@@ -1498,10 +1501,80 @@ configuration, and worker execution. Common customizations may include but are n
14981501
3. Workflows
14991502
4. Interceptors
15001503
1504+
**Important Notes:**
1505+
1506+
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
1507+
- Avoid providing the same plugin to both client and worker to prevent double execution
1508+
- Each plugin's `name()` method returns a unique identifier for debugging purposes
1509+
1510+
#### Usage
1511+
1512+
Plugins can be provided to both `Client` and `Worker`.
1513+
1514+
```python
1515+
# Use the plugin when connecting
1516+
client = await Client.connect(
1517+
"my-server.com:7233",
1518+
plugins=[SomePlugin()]
1519+
)
1520+
```
1521+
```python
1522+
# Use the plugin when creating a worker
1523+
worker = Worker(
1524+
client,
1525+
plugins=[SomePlugin()]
1526+
)
1527+
```
1528+
In the case of `Client`, any plugins will also be provided to any workers created with that client.
1529+
```python
1530+
# Create client with the unified plugin
1531+
client = await Client.connect(
1532+
"localhost:7233",
1533+
plugins=[SomePlugin()]
1534+
)
1535+
1536+
# Worker will automatically inherit the plugin from the client
1537+
worker = Worker(
1538+
client,
1539+
task_queue="my-task-queue",
1540+
workflows=[MyWorkflow],
1541+
activities=[my_activity]
1542+
)
1543+
```
1544+
#### Plugin Implementations
1545+
1546+
The easiest way to create your own plugin is to use `SimplePlugin`. This takes a number of possible configurations to produce
1547+
a relatively straightforward plugin.
1548+
1549+
```python
1550+
plugin = SimplePlugin(
1551+
"MyPlugin",
1552+
data_converter=converter,
1553+
)
1554+
```
1555+
1556+
It is also possible to subclass `SimplePlugin` for some additional controls. This is what we do for `OpenAIAgentsPlugin`.
1557+
1558+
```python
1559+
class MediumPlugin(SimplePlugin):
1560+
def __init__(self):
1561+
super().__init__("MediumPlugin", data_converter=pydantic_data_converter)
1562+
1563+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1564+
config = super().configure_worker(config)
1565+
config["task_queue"] = "override"
1566+
return config
1567+
```
1568+
1569+
#### Advanced Plugin Implementations
1570+
1571+
`SimplePlugin` doesn't cover all possible uses of plugins. For more unusual use cases, an implementor can implement
1572+
the underlying plugin interfaces directly.
1573+
15011574
A single plugin class can implement both client and worker plugin interfaces to share common logic between both
15021575
contexts. When used with a client, it will automatically be propagated to any workers created with that client.
15031576

1504-
#### Client Plugins
1577+
##### Client Plugins
15051578

15061579
Client plugins can intercept and modify client configuration and service connections. They are useful for adding
15071580
authentication, modifying connection parameters, or adding custom behavior during client creation.
@@ -1516,29 +1589,21 @@ class AuthenticationPlugin(Plugin):
15161589
def __init__(self, api_key: str):
15171590
self.api_key = api_key
15181591

1519-
def init_client_plugin(self, next: Plugin) -> None:
1520-
self.next_client_plugin = next
1521-
15221592
def configure_client(self, config: ClientConfig) -> ClientConfig:
15231593
# Modify client configuration
15241594
config["namespace"] = "my-secure-namespace"
1525-
return self.next_client_plugin.configure_client(config)
1595+
return config
15261596

15271597
async def connect_service_client(
1528-
self, config: temporalio.service.ConnectConfig
1598+
self,
1599+
config: temporalio.service.ConnectConfig,
1600+
next: Callable[[ConnectConfig], Awaitable[ServiceClient]]
15291601
) -> temporalio.service.ServiceClient:
1530-
# Add authentication to the connection
15311602
config.api_key = self.api_key
1532-
return await self.next_client_plugin.connect_service_client(config)
1533-
1534-
# Use the plugin when connecting
1535-
client = await Client.connect(
1536-
"my-server.com:7233",
1537-
plugins=[AuthenticationPlugin("my-api-key")]
1538-
)
1603+
return await next(config)
15391604
```
15401605

1541-
#### Worker Plugins
1606+
##### Worker Plugins
15421607

15431608
Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
15441609
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
@@ -1558,47 +1623,39 @@ class MonitoringPlugin(Plugin):
15581623
def __init__(self):
15591624
self.logger = logging.getLogger(__name__)
15601625

1561-
def init_worker_plugin(self, next: Plugin) -> None:
1562-
self.next_worker_plugin = next
1563-
15641626
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
15651627
# Modify worker configuration
15661628
original_task_queue = config["task_queue"]
15671629
config["task_queue"] = f"monitored-{original_task_queue}"
15681630
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
1569-
return self.next_worker_plugin.configure_worker(config)
1631+
return config
15701632

1571-
async def run_worker(self, worker: Worker) -> None:
1633+
async def run_worker(self, worker: Worker, next: Callable[[Worker], Awaitable[None]]) -> None:
15721634
self.logger.info("Starting worker execution")
15731635
try:
1574-
await self.next_worker_plugin.run_worker(worker)
1636+
await next(worker)
15751637
finally:
15761638
self.logger.info("Worker execution completed")
15771639

15781640
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
1579-
return self.next_worker_plugin.configure_replayer(config)
1641+
return config
15801642

15811643
@asynccontextmanager
15821644
async def run_replayer(
15831645
self,
15841646
replayer: Replayer,
15851647
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1648+
next: Callable[
1649+
[Replayer, AsyncIterator[WorkflowHistory]],
1650+
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
1651+
]
15861652
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
15871653
self.logger.info("Starting replay execution")
15881654
try:
1589-
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
1590-
yield results
1655+
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
1656+
yield results
15911657
finally:
15921658
self.logger.info("Replay execution completed")
1593-
1594-
# Use the plugin when creating a worker
1595-
worker = Worker(
1596-
client,
1597-
task_queue="my-task-queue",
1598-
workflows=[MyWorkflow],
1599-
activities=[my_activity],
1600-
plugins=[MonitoringPlugin()]
1601-
)
16021659
```
16031660

16041661
For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
@@ -1612,67 +1669,43 @@ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConf
16121669

16131670

16141671
class UnifiedPlugin(ClientPlugin, WorkerPlugin):
1615-
def init_client_plugin(self, next: ClientPlugin) -> None:
1616-
self.next_client_plugin = next
1617-
1618-
def init_worker_plugin(self, next: WorkerPlugin) -> None:
1619-
self.next_worker_plugin = next
1620-
16211672
def configure_client(self, config: ClientConfig) -> ClientConfig:
16221673
# Client-side customization
16231674
config["data_converter"] = pydantic_data_converter
1624-
return self.next_client_plugin.configure_client(config)
1675+
return config
16251676

16261677
async def connect_service_client(
1627-
self, config: temporalio.service.ConnectConfig
1678+
self,
1679+
config: temporalio.service.ConnectConfig,
1680+
next: Callable[[ConnectConfig], Awaitable[ServiceClient]]
16281681
) -> temporalio.service.ServiceClient:
1629-
# Add authentication to the connection
16301682
config.api_key = self.api_key
1631-
return await self.next_client_plugin.connect_service_client(config)
1683+
return await next(config)
16321684

16331685
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
16341686
# Worker-side customization
1635-
return self.next_worker_plugin.configure_worker(config)
1687+
return config
16361688

1637-
async def run_worker(self, worker: Worker) -> None:
1689+
async def run_worker(self, worker: Worker, next: Callable[[Worker], Awaitable[None]]) -> None:
16381690
print("Starting unified worker")
1639-
await self.next_worker_plugin.run_worker(worker)
1691+
await next(worker)
16401692

16411693
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
16421694
config["data_converter"] = pydantic_data_converter
1643-
return self.next_worker_plugin.configure_replayer(config)
1695+
return config
16441696

16451697
async def run_replayer(
16461698
self,
16471699
replayer: Replayer,
16481700
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1701+
next: Callable[
1702+
[Replayer, AsyncIterator[WorkflowHistory]],
1703+
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
1704+
]
16491705
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1650-
return self.next_worker_plugin.run_replayer(replayer, histories)
1651-
1652-
# Create client with the unified plugin
1653-
client = await Client.connect(
1654-
"localhost:7233",
1655-
plugins=[UnifiedPlugin()]
1656-
)
1657-
1658-
# Worker will automatically inherit the plugin from the client
1659-
worker = Worker(
1660-
client,
1661-
task_queue="my-task-queue",
1662-
workflows=[MyWorkflow],
1663-
activities=[my_activity]
1664-
)
1706+
return next(replayer, histories)
16651707
```
16661708

1667-
**Important Notes:**
1668-
1669-
- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
1670-
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
1671-
- Avoid providing the same plugin to both client and worker to prevent double execution
1672-
- Plugin methods should call the plugin provided during initialization to maintain the plugin chain
1673-
- Each plugin's `name()` method returns a unique identifier for debugging purposes
1674-
1675-
16761709
### Workflow Replay
16771710

16781711
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

temporalio/worker/_interceptor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,13 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
295295

296296
endpoint: str
297297
service: str
298-
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
298+
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any]
299299
input: InputT
300-
schedule_to_close_timeout: Optional[timedelta]
300+
schedule_to_close_timeout: timedelta | None
301301
cancellation_type: temporalio.workflow.NexusOperationCancellationType
302-
headers: Optional[Mapping[str, str]]
303-
output_type: Optional[Type[OutputT]] = None
302+
headers: Mapping[str, str] | None
303+
summary: str | None
304+
output_type: Type[OutputT] | None = None
304305

305306
def __post_init__(self) -> None:
306307
"""Initialize operation-specific attributes after dataclass creation."""

temporalio/worker/_workflow_instance.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,12 +1581,13 @@ async def workflow_start_nexus_operation(
15811581
self,
15821582
endpoint: str,
15831583
service: str,
1584-
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
1584+
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any],
15851585
input: Any,
1586-
output_type: Optional[Type[OutputT]],
1587-
schedule_to_close_timeout: Optional[timedelta],
1586+
output_type: Type[OutputT] | None,
1587+
schedule_to_close_timeout: timedelta | None,
15881588
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
1589-
headers: Optional[Mapping[str, str]],
1589+
headers: Mapping[str, str] | None,
1590+
summary: str | None,
15901591
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
15911592
# start_nexus_operation
15921593
return await self._outbound.start_nexus_operation(
@@ -1599,6 +1600,7 @@ async def workflow_start_nexus_operation(
15991600
schedule_to_close_timeout=schedule_to_close_timeout,
16001601
cancellation_type=cancellation_type,
16011602
headers=headers,
1603+
summary=summary,
16021604
)
16031605
)
16041606

@@ -3330,6 +3332,11 @@ def _apply_schedule_command(self) -> None:
33303332
for key, val in self._input.headers.items():
33313333
v.nexus_header[key] = val
33323334

3335+
if self._input.summary:
3336+
command.user_metadata.summary.CopyFrom(
3337+
self._payload_converter.to_payload(self._input.summary)
3338+
)
3339+
33333340
def _apply_cancel_command(
33343341
self,
33353342
command: temporalio.bridge.proto.workflow_commands.WorkflowCommand,

temporalio/worker/workflow_sandbox/_importer.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,6 @@ def _import(
201201

202202
# Check module restrictions and passthrough modules
203203
if full_name not in sys.modules:
204-
# Issue a warning if appropriate
205-
if (
206-
self.restriction_context.in_activation
207-
and self._is_import_notification_policy_applied(
208-
temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_DYNAMIC_IMPORT
209-
)
210-
):
211-
warnings.warn(
212-
f"Module {full_name} was imported after initial workflow load."
213-
)
214-
215204
# Make sure not an entirely invalid module
216205
self._assert_valid_module(full_name)
217206

@@ -237,6 +226,17 @@ def _import(
237226
setattr(sys.modules[parent], child, sys.modules[full_name])
238227
# All children of this module that are on the original sys
239228
# modules but not here and are passthrough
229+
else:
230+
# Issue a warning if appropriate
231+
if (
232+
self.restriction_context.in_activation
233+
and self._is_import_notification_policy_applied(
234+
temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_DYNAMIC_IMPORT
235+
)
236+
):
237+
warnings.warn(
238+
f"Module {full_name} was imported after initial workflow load."
239+
)
240240

241241
# If the module is __temporal_main__ and not already in sys.modules,
242242
# we load it from whatever file __main__ was originally in

0 commit comments

Comments
 (0)