Skip to content

Commit 080dad3

Browse files
authored
Nexus samples (#174)
1 parent 6cb8ce4 commit 080dad3

File tree

16 files changed

+455
-12
lines changed

16 files changed

+455
-12
lines changed

hello_nexus/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
This sample shows how to define a Nexus service, implement the operation handlers, and
2+
call the operations from a workflow.
3+
4+
### Sample directory structure
5+
6+
- [service.py](./service.py) - shared Nexus service definition
7+
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
8+
- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow and Nexus tasks.
9+
10+
11+
### Instructions
12+
13+
Start a Temporal server. (See the main samples repo [README](../README.md)).
14+
15+
Run the following:
16+
17+
```
18+
temporal operator namespace create --namespace hello-nexus-basic-handler-namespace
19+
temporal operator namespace create --namespace hello-nexus-basic-caller-namespace
20+
21+
temporal operator nexus endpoint create \
22+
--name hello-nexus-basic-nexus-endpoint \
23+
--target-namespace hello-nexus-basic-handler-namespace \
24+
--target-task-queue my-handler-task-queue \
25+
--description-file hello_nexus/endpoint_description.md
26+
```
27+
28+
In one terminal, run the Temporal worker in the handler namespace:
29+
```
30+
uv run hello_nexus/handler/worker.py
31+
```
32+
33+
In another terminal, run the Temporal worker in the caller namespace and start the caller
34+
workflow:
35+
```
36+
uv run hello_nexus/caller/app.py
37+
```

hello_nexus/__init__.py

Whitespace-only changes.

hello_nexus/caller/__init__.py

Whitespace-only changes.

hello_nexus/caller/app.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
from hello_nexus.caller.workflows import CallerWorkflow
9+
from hello_nexus.service import MyOutput
10+
11+
NAMESPACE = "hello-nexus-basic-caller-namespace"
12+
TASK_QUEUE = "hello-nexus-basic-caller-task-queue"
13+
14+
15+
async def execute_caller_workflow(
16+
client: Optional[Client] = None,
17+
) -> tuple[MyOutput, MyOutput]:
18+
client = client or await Client.connect(
19+
"localhost:7233",
20+
namespace=NAMESPACE,
21+
)
22+
23+
async with Worker(
24+
client,
25+
task_queue=TASK_QUEUE,
26+
workflows=[CallerWorkflow],
27+
):
28+
return await client.execute_workflow(
29+
CallerWorkflow.run,
30+
arg="world",
31+
id=str(uuid.uuid4()),
32+
task_queue=TASK_QUEUE,
33+
)
34+
35+
36+
if __name__ == "__main__":
37+
loop = asyncio.new_event_loop()
38+
try:
39+
results = loop.run_until_complete(execute_caller_workflow())
40+
for output in results:
41+
print(output.message)
42+
except KeyboardInterrupt:
43+
loop.run_until_complete(loop.shutdown_asyncgens())

hello_nexus/caller/workflows.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from temporalio import workflow
2+
3+
with workflow.unsafe.imports_passed_through():
4+
from hello_nexus.service import MyInput, MyNexusService, MyOutput
5+
6+
NEXUS_ENDPOINT = "hello-nexus-basic-nexus-endpoint"
7+
8+
9+
# This is a workflow that calls two nexus operations.
10+
@workflow.defn
11+
class CallerWorkflow:
12+
# An __init__ method is always optional on a workflow class. Here we use it to set the
13+
# nexus client, but that could alternatively be done in the run method.
14+
def __init__(self):
15+
self.nexus_client = workflow.create_nexus_client(
16+
service=MyNexusService,
17+
endpoint=NEXUS_ENDPOINT,
18+
)
19+
20+
# The workflow run method invokes two nexus operations.
21+
@workflow.run
22+
async def run(self, name: str) -> tuple[MyOutput, MyOutput]:
23+
# Start the nexus operation and wait for the result in one go, using execute_operation.
24+
wf_result = await self.nexus_client.execute_operation(
25+
MyNexusService.my_workflow_run_operation,
26+
MyInput(name),
27+
)
28+
# Alternatively, you can use start_operation to obtain the operation handle and
29+
# then `await` the handle to obtain the result.
30+
sync_operation_handle = await self.nexus_client.start_operation(
31+
MyNexusService.my_sync_operation,
32+
MyInput(name),
33+
)
34+
sync_result = await sync_operation_handle
35+
return sync_result, wf_result

hello_nexus/endpoint_description.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
## Service: [MyNexusService](https://github.com/temporalio/samples-python/blob/main/hello_nexus/basic/service.py)
2+
- operation: `my_sync_operation`
3+
- operation: `my_workflow_run_operation`

hello_nexus/handler/__init__.py

Whitespace-only changes.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
This file demonstrates how to implement a Nexus service.
3+
"""
4+
5+
from __future__ import annotations
6+
7+
import uuid
8+
9+
import nexusrpc
10+
from temporalio import nexus
11+
12+
from hello_nexus.handler.workflows import WorkflowStartedByNexusOperation
13+
from hello_nexus.service import MyInput, MyNexusService, MyOutput
14+
15+
16+
@nexusrpc.handler.service_handler(service=MyNexusService)
17+
class MyNexusServiceHandler:
18+
# You can create an __init__ method accepting what is needed by your operation
19+
# handlers to handle requests. You typically instantiate your service handler class
20+
# when starting your worker. See hello_nexus/basic/handler/worker.py.
21+
22+
# This is a nexus operation that is backed by a Temporal workflow. The start method
23+
# starts a workflow, and returns a nexus operation token. Meanwhile, the workflow
24+
# executes in the background; Temporal server takes care of delivering the eventual
25+
# workflow result (success or failure) to the calling workflow.
26+
#
27+
# The token will be used by the caller if it subsequently wants to cancel the Nexus
28+
# operation.
29+
@nexus.workflow_run_operation
30+
async def my_workflow_run_operation(
31+
self, ctx: nexus.WorkflowRunOperationContext, input: MyInput
32+
) -> nexus.WorkflowHandle[MyOutput]:
33+
return await ctx.start_workflow(
34+
WorkflowStartedByNexusOperation.run,
35+
input,
36+
id=str(uuid.uuid4()),
37+
)
38+
39+
# This is a Nexus operation that responds synchronously to all requests. That means
40+
# that unlike the workflow run operation above, in this case the `start` method
41+
# returns the final operation result.
42+
#
43+
# Sync operations are free to make arbitrary network calls, or perform CPU-bound
44+
# computations. Total execution duration must not exceed 10s.
45+
@nexusrpc.handler.sync_operation
46+
async def my_sync_operation(
47+
self, ctx: nexusrpc.handler.StartOperationContext, input: MyInput
48+
) -> MyOutput:
49+
return MyOutput(message=f"Hello {input.name} from sync operation!")

hello_nexus/handler/worker.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
import logging
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
from hello_nexus.handler.service_handler import MyNexusServiceHandler
9+
from hello_nexus.handler.workflows import WorkflowStartedByNexusOperation
10+
11+
interrupt_event = asyncio.Event()
12+
13+
NAMESPACE = "hello-nexus-basic-handler-namespace"
14+
TASK_QUEUE = "my-handler-task-queue"
15+
16+
17+
async def main(client: Optional[Client] = None):
18+
logging.basicConfig(level=logging.INFO)
19+
20+
client = client or await Client.connect(
21+
"localhost:7233",
22+
namespace=NAMESPACE,
23+
)
24+
25+
# Start the worker, passing the Nexus service handler instance, in addition to the
26+
# workflow classes that are started by your nexus operations, and any activities
27+
# needed. This Worker will poll for both workflow tasks and Nexus tasks (this example
28+
# doesn't use any activities).
29+
async with Worker(
30+
client,
31+
task_queue=TASK_QUEUE,
32+
workflows=[WorkflowStartedByNexusOperation],
33+
nexus_service_handlers=[MyNexusServiceHandler()],
34+
):
35+
logging.info("Worker started, ctrl+c to exit")
36+
await interrupt_event.wait()
37+
logging.info("Shutting down")
38+
39+
40+
if __name__ == "__main__":
41+
loop = asyncio.new_event_loop()
42+
try:
43+
loop.run_until_complete(main())
44+
except KeyboardInterrupt:
45+
interrupt_event.set()
46+
loop.run_until_complete(loop.shutdown_asyncgens())

hello_nexus/handler/workflows.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from temporalio import workflow
2+
3+
with workflow.unsafe.imports_passed_through():
4+
from hello_nexus.service import MyInput, MyOutput
5+
6+
7+
# This is the workflow that is started by the `my_workflow_run_operation` nexus operation.
8+
@workflow.defn
9+
class WorkflowStartedByNexusOperation:
10+
@workflow.run
11+
async def run(self, input: MyInput) -> MyOutput:
12+
return MyOutput(message=f"Hello {input.name} from workflow run operation!")

0 commit comments

Comments
 (0)