Skip to content

Improve documentation to use Broker as consumer #148

@MuriloScarpaSitonio

Description

@MuriloScarpaSitonio

I want to use taskiq as a consumer. From this comment, I was advised to use taskiq.receiver.Receiver to accomplish this. I think I've figured how to make this work (thanks to the great code quality!), but I'm not 100% sure that I'm in the correct path. Please, take a look at the snippet bellow:

import asyncio
import json
from typing import AsyncGenerator
from uuid import uuid4

from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.receiver import Receiver


class ListenerInMemoryBroker(InMemoryBroker):
    async def listen(self) -> AsyncGenerator[str, None]:
        for i in range(1, 4):
            await asyncio.sleep(i)
            yield json.dumps(
                {
                    "task_id": str(uuid4()),
                    "task_name": "my_task",
                    "labels": {},
                    "args": [i, "a" * i],
                    "kwargs": {"i": i},
                }
            )


broker = ListenerInMemoryBroker()


@broker.task(task_name="my_task")
async def my_task(n: int, a: str, i: int) -> None:
    print(f"{n=}, {n.__class__=}")
    print(f"{a=}, {a.__class__=}")
    print(f"{i=}, {i.__class__=}")


async def main() -> None:
    consumer = Receiver(broker=broker, max_async_tasks=100)
    await consumer.listen()


if __name__ == "__main__":
    asyncio.run(main())

From this dummy implementation, I assumed the following:

  • Messages must be produced with the schema defined at taskig.message.TaskiqMessage;
  • task_name must be explicit set;

And also left me with some doubts:

  • What's the usage of max_async_tasks and max_prefetch?

With that said, I think this important use case deserves better documentation. If anyone can confirm my assumptions, answer my queries, and point out where I can add a section in the docs about this, I'd be happy to do so!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions