Open
Description
Is your feature request related to a problem? Please describe.
My app may have different dependencies that I want check using k8s probes (database, s3, etc)
Describe the solution you'd like
I need possibility using DI for access to my dependency
Feature code example
from faststream import FastStream, Depends
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
def simple_dependency() -> int:
return 1
async def readiness(scope, receive, send, d: int = Depends(simple_dependency)):
assert d == 1
return AsgiResponse(b"", status_code=200)
app = FastStream(broker).as_asgi(
asgi_routes=[
("/readiness", readiness),
],
asyncapi_path="/docs",
)
Metadata
Assignees
Type
Projects
Status
Backlog
Activity