Skip to content

fix: run dynamo components standalone (1/2) #1560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deploy/sdk/src/dynamo/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

warnings.filterwarnings("ignore", category=UserWarning, message=".*pkg_resources.*")

from dynamo.sdk.cli.serve_standalone import DynamoContext, serve

# flake8: noqa: E402
from dynamo.sdk.core.decorators.endpoint import abstract_endpoint, api, endpoint
from dynamo.sdk.core.lib import DYNAMO_IMAGE, depends, liveness, readiness, service
Expand All @@ -39,4 +41,6 @@
"abstract_endpoint",
"liveness",
"readiness",
"serve",
"DynamoContext",
]
Empty file.
149 changes: 149 additions & 0 deletions deploy/sdk/src/dynamo/sdk/cli/serve_standalone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import inspect
import logging
from typing import Any, Dict, get_type_hints

from pydantic import BaseModel, ConfigDict

from dynamo.runtime import Component, DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk.cli.utils import configure_target_environment
from dynamo.sdk.core.protocol.interface import DynamoTransport
from dynamo.sdk.core.runner import TargetEnum

logger = logging.getLogger(__name__)

# Use Dynamo target (this is the only supported one)
configure_target_environment(TargetEnum.DYNAMO)


class DynamoContext(BaseModel):
"""Context object for the service that is injected into components that declare it as a typed parameter."""

model_config = ConfigDict(arbitrary_types_allowed=True)

runtime: DistributedRuntime
component: Component
endpoints: Dict[str, Any]
name: str
namespace: str


async def serve(service, *args, **kwargs):
# 1. Create a runtime/dyn worker if one is not passed
# 2. Init the context
# 3. Init the inner class injecting the context
# 4. Run async init to do any async setup
# 5. Serve the decorated endpoints of the component

@dynamo_worker()
async def worker(runtime: DistributedRuntime):
# Create the service
namespace, name = service.dynamo_address()
component = runtime.namespace(namespace).component(name)
logger.info(f"Registering component {namespace}/{name}")
await component.create_service()

# 2. Declare the endpoints on the component that use DynamoTransport.DEFAULT (NATS based)
drt_endpoints = [
ep
for ep in service.get_dynamo_endpoints().values()
if DynamoTransport.DEFAULT in ep.transports
]
endpoints = {ep.name: component.endpoint(ep.name) for ep in drt_endpoints}

# 3. init a pydantic model with the runtime, component, endpoints, name, namespace
dynamo_context = DynamoContext(
runtime=runtime,
component=component,
endpoints=endpoints,
name=name,
namespace=namespace,
)

# 4. Init the inner class injecting the context and other args and kwargs passed by the user
should_inject = _check_dynamo_context_type(service)
if should_inject:
inner_instance = service.inner(dynamo_context, *args, **kwargs)
else:
logger.info(f"Not injecting dynamo_context into {service.inner.__name__}")
inner_instance = service.inner(*args, **kwargs)

# 5. Get and run async init if it exists
async_init = get_async_init(inner_instance)
if async_init:
logger.info(f"Running async init for {inner_instance.__class__.__name__}")
await async_init()

# 6. Finally serve each endpoint
handlers = get_endpoint_handlers(drt_endpoints, inner_instance)
ep_2_handler = {endpoints[ep_name]: handlers[ep_name] for ep_name in endpoints}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this ep_2_handler?

Copy link
Contributor Author

@mohammedabdulwahhab mohammedabdulwahhab Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapping between endpoint (from drt) to the actual request handler that should be used to serve it

endpoint = component.endpoint("foo")
handler = # actual function to serve the endpoint with
... # below
endpoint.serve_endpoint(handler)

logger.debug(f"Serving endpoints: {[ep_name for ep_name in endpoints]}")
tasks = [ep.serve_endpoint(handler) for ep, handler in ep_2_handler.items()]
await asyncio.gather(*tasks)

await worker()


def get_async_init(instance):
"""Return the decorated async init method for the class"""
for name, member in vars(instance.__class__).items():
if callable(member) and getattr(member, "__dynamo_startup_hook__", False):
return getattr(instance, name)
return None


def get_endpoint_handlers(endpoints, inner_instance):
"""Get the endpoint handlers for the service"""
ep_handlers = {}
for endpoint in endpoints:
# Binding the instance to the methods of the class
bound_method = endpoint.func.__get__(inner_instance)
ep_handlers[endpoint.name] = dynamo_endpoint(endpoint.request_type, Any)(
bound_method
)
return ep_handlers


def _check_dynamo_context_type(service) -> bool:
"""Check if the service's constructor accepts a properly typed dynamo_context parameter.

Args:
service: The service class to check

Returns:
bool: True if dynamo_context should be injected

Raises:
TypeError: If dynamo_context parameter is present but not properly typed
"""
sig = inspect.signature(service.inner.__init__)
params = list(sig.parameters.keys())

# Check if dynamo_context is the first argument after self
should_inject = len(params) > 1 and params[1] == "dynamo_context"

if should_inject:
# Get type hints for the constructor
type_hints = get_type_hints(service.inner.__init__)
# Check if dynamo_context has the correct type hint
if type_hints.get("dynamo_context") != DynamoContext:
raise TypeError(
f"The dynamo_context parameter in {service.inner.__name__}.__init__ must be explicitly typed as DynamoContext"
)

return should_inject
42 changes: 42 additions & 0 deletions docs/API/sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,48 @@ Following these practices help create flexible and maintainable Dynamo services

You can deploy a single service for local development even if you have a dependency graph defined using `depends()` using `dynamo serve --service-name <ClassName> <entrypoint> <configuration arguments>`. You can see an example of this in our multinode documentation [here](../examples/multinode.md).

### Direct Python Invocation

Components can also be invoked directly using Python without the Dynamo CLI. Simply add a `__main__` block to your Python file that sets up argument parsing and calls `serve()`. Here's a complete example:

```python
from pydantic import BaseModel
from dynamo.sdk import service, endpoint, serve, DynamoContext

class GreetingRequest(BaseModel):
name: str

@service(dynamo={"namespace": "example"})
class GreetingService:
def __init__(self, dynamo_context: DynamoContext, greeting: str = "Hello") -> None:
# dynamo_context is injected when properly type hinted as first argument
self.runtime = dynamo_context.runtime
self.greeting = greeting

@endpoint()
async def greet(self, request: GreetingRequest):
yield f"{self.greeting}, {request.name}!"

if __name__ == "__main__":
import argparse
import asyncio
import uvloop

parser = argparse.ArgumentParser(description="Run the greeting service")
parser.add_argument("--greeting", default="Hello", help="Greeting to use")
args = parser.parse_args()

uvloop.install() # Optional: Use uvloop for better performance
# All args after service name are passed to __init__
asyncio.run(serve(GreetingService, greeting=args.greeting))
```

This component can be run via `dynamo serve` or directly with python.

**Notes:**
- To receive the `dynamo_context` in your service, declare it as the first argument after `self` in `__init__` with the type hint `DynamoContext`
- All arguments passed to `serve()` after the service class are forwarded to the service's `__init__` method

### Composing Services into an Graph

There are two main ways to compose services in Dynamo:
Expand Down
97 changes: 97 additions & 0 deletions examples/hello_world/hello_world_standalone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from pydantic import BaseModel

from dynamo.sdk import DynamoContext, async_on_start, endpoint, serve, service

logger = logging.getLogger(__name__)


class RequestType(BaseModel):
text: str


@service(
dynamo={
"namespace": "dynamo",
}
)
class Server:
def __init__(self, greeting: str) -> None:
logger.info("Starting server")
self.greeting = greeting

@endpoint()
async def generate(self, req: RequestType):
yield f"{self.greeting} {req.text}"


@service(
dynamo={
"namespace": "dynamo",
}
)
class Client:
def __init__(self, dynamo_context: DynamoContext, name: str) -> None:
logger.info("Starting client")
self.runtime = dynamo_context.runtime
self.name = name

@async_on_start
async def async_init(self):
self.server = (
await self.runtime.namespace("dynamo")
.component("Server")
.endpoint("generate")
.client()
)
await self.server.wait_for_instances()

stream = await self.server.generate(
RequestType(text=self.name).model_dump_json()
)
async for word in stream:
print(word.data())


if __name__ == "__main__":
"""
Example of running Dynamo components with python command
$ python hello_world_standalone.py server --greeting "Hello, World!"
$ python hello_world_standalone.py client --name "Bob"
"""
import argparse
import asyncio

import uvloop

parser = argparse.ArgumentParser(description="Run Hello World server or client")
parser.add_argument(
"component", choices=["server", "client"], help="Which component to run"
)
parser.add_argument(
"--greeting", default="Hello, World!", help="Greeting message (for server)"
)
parser.add_argument("--name", default="User", help="Name to use (for client)")
args = parser.parse_args()

uvloop.install()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be happenign in serve_standalone?

if args.component == "server":
asyncio.run(serve(Server, greeting=args.greeting))
else:
asyncio.run(serve(Client, name=args.name))
Loading