Skip to content

Conversation

empicano
Copy link
Owner

@empicano empicano commented Jun 2, 2024

This proposes a simpler way to filter messages and structure message handling.

Unsorted points that I considered:

  • This lets us split message handling into multiple files via multiple routers (without circular imports)
  • This lets us use the client inside a handler function, to e.g. publish a message back in a request/response fashion
  • We can dynamically subscribe and unsubscribe
  • The values of wildcards (+/#) of the topic filter are automatically available as *args in the handler function
  • We still only have a single message queue (easier for newcomers, concurrency could be implemented as shown below, optionally with priority queue)
  • We can still pass a non-default queue to the client to prioritize the handling of certain messages
  • We still have flexibility to not use the routers, but handle the messages directly. Routers are a natural development, once the application gets too complex we can iteratively add them
  • We are still flexible enough to process messages concurrently in an individual way (i.e. our message loop is still transparent)
  • It's a non-breaking change

The interface looks like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)

Where we can process messages concurrently e.g. like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async def work(client):
    async for message in client._messages():
        await client.route(message)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(work(client))
        tg.create_task(work(client))

Glad to hear feedback on this 🙂

@empicano empicano requested a review from frederikaalund June 9, 2024 14:53
@empicano
Copy link
Owner Author

empicano commented Jul 30, 2024

Another possibility would be:

async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client(
    "test.mosquitto.org",
    handlers={"humidity/+/inside/#": handle}
) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)
  • This could spare users the possible confusion of "Do I use a router or a client, what's the difference?"
  • What's nice about the other approach is that the filter (humidity/+/inside/#) is written directly next to the handler function, which makes it immediately clear which messages are handled in this function without jumping around in the code. On the other hand, with this option we have all the filters in one place. Probably just comes down to preference.
  • This avoids the problem of defining two routers with overlap in filters

I currently prefer this option.


Aside: We should be able to optimize message to handler matching with a trie.

@empicano
Copy link
Owner Author

Yet another possibility, assigning handlers to subscriptions:

async def handle(message):
    print(message.payload)
    if isinstance(message, Request):
        await message.reply("foo")


async with aiomqtt.Client("test.mosquitto.org") as client:
    await client.subscribe("humidity/#", handle)
    async for message in client.messages:
        await client.route(message)

We'd have to potentially duplicate messages internally to forward exactly one message per subscription ID. From spec:

When Clients make subscriptions with Topic Filters that include wildcards, it is possible for a Client’s subscriptions to overlap so that a published message might match multiple filters. In this case the Server MUST deliver the message to the Client respecting the maximum QoS of all the matching subscriptions [MQTT-3.3.4-2]. In addition, the Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant