Though it is not required, you can use dishka-faststream integration. It features:
- automatic REQUEST scope management using middleware
- passing
StreamMessageandContextRepoobject as a context data to providers - automatic injection of dependencies into message handler.
You can use auto-injection for FastStream 0.5.0 and higher. For older version you need to specify @inject manually.
Note
If you are using FastAPI plugin of FastStream you need to use both dishka integrations, but you can share the same container.
- Call
dishka_faststream.setup_dishkaon faststream broker or router.- Call
dishka.integrations.fastapi.setup_dishkaon fastapi app.
Install using pip
pip install dishka-faststreamOr with uv
uv add dishka-faststream- Import
from dishka_faststream import (
FromDishka,
inject,
setup_dishka,
FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope- Create provider. You can use
faststream.types.StreamMessageandfaststream.ContextRepoas a factory parameter to access on REQUEST-scope
class YourProvider(Provider):
@provide(scope=Scope.REQUEST)
def create_x(self, event: StreamMessage) -> X:
...- Mark those of your handlers parameters which are to be injected with
FromDishka[]
@broker.subscriber("test")
async def start(
gateway: FromDishka[Gateway],
):
...3a. (optional) decorate them using @inject if you are not using auto-injection
@broker.subscriber("test")
@inject
async def start(
gateway: FromDishka[Gateway],
):
...- (optional) Use
FastStreamProvider()when creating container if you are going to usefaststream.types.StreamMessageorfaststream.ContextRepoin providers
container = make_async_container(YourProvider(), FastStreamProvider())- Setup
dishkaintegration.auto_inject=Trueis required unless you explicitly use@injectdecorator
setup_dishka(container=container, app=app, auto_inject=True)Or pass your own inject decorator
setup_dishka(container=container, broker=broker, auto_inject=my_inject)- Running RabbitMQ
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:management- Example of usage FastStream + Litestar
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
import dishka_faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka_faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod
class SomeDependency:
async def do_something(self) -> int:
print("Hello world")
return 42
class SomeProvider(Provider):
@provide(scope=Scope.REQUEST)
def some_dependency(self) -> SomeDependency:
return SomeDependency()
@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
amqp_router = RabbitRouter()
@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
def create_app() -> Litestar:
container = make_async_container(SomeProvider())
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)
http = Litestar(
route_handlers=[http_handler],
on_startup=[broker.start],
on_shutdown=[broker.stop],
)
litestar_integration.setup_dishka(container, http)
return http
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka_faststream import inject as faststream_inject
class SomeDependency:
async def do_something(self) -> int:
print("Hello world")
return 42
class SomeProvider(Provider):
@provide(scope=Scope.REQUEST)
def some_dependency(self) -> SomeDependency:
return SomeDependency()
router = APIRouter(route_class=DishkaRoute)
@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
amqp_router = RabbitRouter()
@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
def create_app() -> FastAPI:
container = make_async_container(SomeProvider())
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
async with broker:
await broker.start()
yield
http = FastAPI(lifespan=lifespan)
http.include_router(router)
fastapi_integration.setup_dishka(container, http)
return http
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)Simple example:
from collections.abc import AsyncIterator
import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter
router = RabbitRouter()
@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
print(f"{msg=}")
return some_dependency
@pytest.fixture
async def broker() -> RabbitBroker:
broker = RabbitBroker()
broker.include_router(router)
return broker
@pytest.fixture
def mock_provider() -> Provider:
class MockProvider(Provider):
@provide(scope=Scope.REQUEST)
async def get_some_dependency(self) -> int:
return 42
return MockProvider()
@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
return make_async_container(mock_provider)
@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
app = FastStream(broker)
faststream_integration.setup_dishka(container, app, auto_inject=True)
return FastStream(broker)
@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
async with TestRabbitBroker(app.broker) as br, TestApp(app):
yield br
@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
result = await client.request("hello", "test-queue")
assert await result.decode() == 42