-
-
Notifications
You must be signed in to change notification settings - Fork 109
Closed
Description
I want to use taskiq as a consumer. From this comment, I was advised to use taskiq.receiver.Receiver to accomplish this. I think I've figured how to make this work (thanks to the great code quality!), but I'm not 100% sure that I'm in the correct path. Please, take a look at the snippet bellow:
import asyncio
import json
from typing import AsyncGenerator
from uuid import uuid4
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.receiver import Receiver
class ListenerInMemoryBroker(InMemoryBroker):
async def listen(self) -> AsyncGenerator[str, None]:
for i in range(1, 4):
await asyncio.sleep(i)
yield json.dumps(
{
"task_id": str(uuid4()),
"task_name": "my_task",
"labels": {},
"args": [i, "a" * i],
"kwargs": {"i": i},
}
)
broker = ListenerInMemoryBroker()
@broker.task(task_name="my_task")
async def my_task(n: int, a: str, i: int) -> None:
print(f"{n=}, {n.__class__=}")
print(f"{a=}, {a.__class__=}")
print(f"{i=}, {i.__class__=}")
async def main() -> None:
consumer = Receiver(broker=broker, max_async_tasks=100)
await consumer.listen()
if __name__ == "__main__":
asyncio.run(main())From this dummy implementation, I assumed the following:
- Messages must be produced with the schema defined at
taskig.message.TaskiqMessage; task_namemust be explicit set;
And also left me with some doubts:
- What's the usage of
max_async_tasksandmax_prefetch?
With that said, I think this important use case deserves better documentation. If anyone can confirm my assumptions, answer my queries, and point out where I can add a section in the docs about this, I'd be happy to do so!
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels