|
1 | 1 | # Message Queue
|
2 | 2 |
|
3 |
| -Introduction |
| 3 | +Message Queue is a feature that extends MQTT subscribe/publish pattern. |
| 4 | + |
| 5 | +## Why Message Queue? |
| 6 | + |
| 7 | +MQTT is a publish/subscribe messaging protocol with wide adoption for device management. |
| 8 | + |
| 9 | +In many MQTT applications, we need to decouple the lifetime of messages from the lifetime of publishers and subscribers. |
| 10 | + |
| 11 | +For example, we may need to configure a device before it appears online. Another example is a work queue — we want to submit some tasks to be executed by workers without waiting for the workers to be online. |
| 12 | + |
| 13 | +Message Queue is a feature that extends MQTT subscribe/publish pattern to solve these kinds of problems. |
| 14 | + |
| 15 | +It allows messages to be persisted regardless of the subscribers' online status for further processing, individually or cooperatively. |
4 | 16 |
|
5 | 17 | ## Key Concepts
|
6 | 18 |
|
7 |
| -What is the Message Queue feature in EMQX? |
| 19 | +* Queue Name: An MQTT topic or topic-filter that identifies the queue. Messages published to matching topics are automatically queued into the queue. |
| 20 | + |
| 21 | +* Queue Declaration: The process of creating a durable queue and setting its properties. |
| 22 | + |
| 23 | +* Queue Deletion: Removal of a queue and its messages. |
| 24 | + |
| 25 | +* Last Value semantic: An optional feature enabled by defining a Queue Key Expression during queue declaration. |
| 26 | + |
| 27 | +* Topic Prefix: Uses `$q/{name}` prefix to identify queue subscriptions. |
| 28 | + |
| 29 | +* Queue Properties: Configurable attributes including data retention period and dispatch strategy. |
| 30 | + |
| 31 | +* QoS Levels: Primarily supports QoS 0 (at-most-once) and QoS 1 (at-least-once). QoS 2 messages published to a queue are typically downgraded to QoS 1. Subscribers attempting QoS 2 subscriptions are also granted QoS 1. |
| 32 | + |
| 33 | +* Persistence: Messages persist even when no subscribers are online. Last Value semantic is the default behavior for queues. In a Last Value Queuue, the latest message will overwrite the previous messages from the same topic with the same key. For regular queues (without Last Value semantic), all messages are written directly to the queue. |
8 | 34 |
|
9 | 35 | ## Core Features
|
10 | 36 |
|
11 |
| -## Why Message Queue? |
| 37 | +* Enqueue: Messages published to matching topics are automatically queued. If the queue was declared with a Compaction Key Expression, the broker evaluates this expression against the incoming message. If a key is successfully derived, the broker will replace any existing unconsumed message with the same key. If the expression fails to resolve a key, or if no expression was defined for the queue, the message is enqueued directly following FIFO principles. |
| 38 | + |
| 39 | +* Dequeue: Subscribers receive messages based on the dispatch strategy |
| 40 | + |
| 41 | +* QoS Support: Supports QoS 0 and 1. The publisher's original QoS 2 request is fully honored. When delivering a message from the persistent Queue, if a subscriber requested QoS 2, the broker may downgrade it to QoS 1 (depends on the final implementation design). |
| 42 | + |
| 43 | +* Dispatch Strategies: random, round_robin, least_inflight |
12 | 44 |
|
13 |
| -What message queue can achieve? What problems can it solve? |
| 45 | +* Management: REST APIs for queue CRUD operations and configuration. |
14 | 46 |
|
15 | 47 | ## Use Cases
|
16 | 48 |
|
17 |
| -## How Message Queue Works? |
| 49 | +Device Command Queuing: Cloud applications queue commands for IoT devices, ensuring commands aren't lost when devices are offline |
18 | 50 |
|
19 |
| -MQTT message workflow... |
| 51 | +Batch Processing: Large datasets split into smaller tasks distributed across multiple workers |
20 | 52 |
|
21 |
| -interacting with other features... |
| 53 | +Sensor Data Processing: Queue sensor readings for batch processing and analysis |
22 | 54 |
|
23 |
| -## Enable/Create Message Queue |
| 55 | +Latest Configuration Dispatch: Ensure devices always attempt to fetch and process the latest configuration command; older, unhandled commands (for the same config item/key) are superseded or marked obsolete in the queue. |
| 56 | + |
| 57 | +## How Message Queue Works? |
| 58 | + |
| 59 | +### Publishing |
| 60 | + |
| 61 | +* A client publishes a message to `some/topic`. |
| 62 | +* An MQ hook is triggered to handle the message publication. |
| 63 | +* The hook looks up in the MQ registry if there are any Message Queues whose topic filter matches the message topic. |
| 64 | +* If yes, the hook writes the message to the corresponding Message Queues. |
24 | 65 |
|
25 |
| -### Explicitly Declare a Queue |
| 66 | +### Subscribing/Consuming |
26 | 67 |
|
27 |
| -### Implicitly Declare a Queue |
| 68 | +* A client subscribes to some topic. |
| 69 | +* An MQ hook is triggered to handle the subscription. |
| 70 | +* If the topic is a Message Queue topic (`$q/some/topic`), the hook initializes a subscription in the Channel's state and |
| 71 | +initiates a connection to the Message Queue Consumer. |
| 72 | +* If a Consumer is not yet found, a new consumer is started. |
| 73 | +* The Consumer restores message consumption progress and starts to fetch data from the Message Queue message database. |
| 74 | +* The Consumer dispatches received messages to the connected subscribers. |
| 75 | +* The subscribers (channels) deliver MQ messages to the clients. |
28 | 76 |
|
29 | 77 | ## Configure Message Queue
|
30 | 78 |
|
| 79 | +### REST API |
| 80 | + |
| 81 | +```bash |
| 82 | +curl -v -u key:secret -X PUT -H "Content-Type: application/json" http://localhost:18083/api/v5/message_queues/config -d '{"find_queue_retry_interval": "10s", "gc_interval": "1h", "regular_queue_retention_period": "7d"}' |
| 83 | +``` |
| 84 | + |
31 | 85 | ### Dashboard
|
32 | 86 |
|
33 | 87 | MQTT Settings -> Message Queue
|
34 | 88 |
|
35 |
| -### REST API |
36 | 89 |
|
37 |
| -### Configuration File? |
| 90 | +### Configuration File |
| 91 | + |
| 92 | +```hocon |
| 93 | +mq { |
| 94 | + ## The interval at which the Message Queues will clean up expired messages. |
| 95 | + gc_interval = 1h |
| 96 | + ## The maximum retention period of messages in regular Message Queues. |
| 97 | + regular_queue_retention_period = 1d |
| 98 | + ## The interval at which subscribers will retry to find a queue if the queue is not found |
| 99 | + ## when subscribing to a queue topic. |
| 100 | + find_queue_retry_interval = 10s |
| 101 | + ## Settings for the database storing the Message Queue state. |
| 102 | + ## See Durable Storage configuration for more details. |
| 103 | + state_db { |
| 104 | + transaction { |
| 105 | + flush_interval = 10 |
| 106 | + idle_flush_interval = 5 |
| 107 | + conflict_window = 5000 |
| 108 | + } |
| 109 | + } |
| 110 | + ## Settings for the database storing the Message Queue messages. |
| 111 | + ## See Durable Storage configuration for more details. |
| 112 | + message_db { |
| 113 | + transaction { |
| 114 | + flush_interval = 100 |
| 115 | + idle_flush_interval = 20 |
| 116 | + conflict_window = 5000 |
| 117 | + } |
| 118 | + } |
| 119 | +} |
| 120 | +``` |
38 | 121 |
|
39 | 122 | ## Manage Message Queue
|
40 | 123 |
|
| 124 | +### REST API |
| 125 | + |
| 126 | +```bash |
| 127 | +curl -s -u key:secret -X PUT -H "Content-Type: application/json" http://localhost:18083/api/v5/message_queues/queues/t1%2F%23 -d '{"dispatch_strategy": "least_inflight"}' | jq |
| 128 | +{ |
| 129 | + ... |
| 130 | + "topic_filter": "t1/#" |
| 131 | +} |
| 132 | +``` |
| 133 | + |
| 134 | +### Dashboard |
| 135 | + |
| 136 | +To be done. |
| 137 | + |
| 138 | +## Enable/Create Message Queue |
| 139 | + |
| 140 | +### REST API |
| 141 | + |
| 142 | +```bash |
| 143 | +curl -s -u key:secret -X POST -H "Content-Type: application/json" http://localhost:18083/api/v5/message_queues -d '{"topic_filter": "t1/#", "is_lastvalue": false}' | jq |
| 144 | +{ |
| 145 | + ... |
| 146 | + "topic_filter": "t1/#" |
| 147 | +} |
| 148 | +``` |
| 149 | + |
41 | 150 | ### Dashboard
|
42 | 151 |
|
| 152 | +To be done. |
| 153 | + |
| 154 | +## Delete Message Queue |
| 155 | + |
43 | 156 | ### REST API
|
44 | 157 |
|
| 158 | +```bash |
| 159 | +curl -s -u key:secret -X DELETE http://localhost:18083/api/v5/message_queues/queues/t1%2F%23 |
| 160 | +``` |
| 161 | + |
| 162 | +### Dashboard |
| 163 | + |
| 164 | +To be done. |
| 165 | + |
45 | 166 | ## FAQ & Troubleshooting (Optional but Recommended)
|
46 | 167 |
|
47 |
| -- Why messages aren’t enqueued? |
| 168 | +- Why messages aren’t enqueued. |
| 169 | + |
| 170 | +Inspect EMQX logs for errors with `mq_` prefix. |
| 171 | + |
48 | 172 | - What happens when queues overflow?
|
49 |
| -- Debugging compaction behavior |
50 |
| -- QoS mismatch issues |
| 173 | + |
| 174 | +Currently, the queues are not limited in size or amount of messages. Queues are limited in time (retention period). The expired messages are not delivered to the subscribers. These messages are removed regularly by the broker. |
51 | 175 |
|
52 | 176 | ## Reference & Related Features
|
53 | 177 |
|
54 | 178 | - [Rule Engine](#) – To route queued messages
|
55 | 179 | - [Shared Subscriptions](#) – Compared with Message Queues
|
56 |
| -- [EMQX REST API Reference](#) |
| 180 | +- [EMQX REST API Reference](#) |
0 commit comments