|
6 | 6 | %% |
7 | 7 |
|
8 | 8 | -module(rabbit_queue_type). |
| 9 | +-feature(maybe_expr, enable). |
9 | 10 |
|
10 | 11 | -behaviour(rabbit_registry_class). |
11 | 12 |
|
@@ -307,7 +308,12 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) -> |
307 | 308 | declare(Q0, Node) -> |
308 | 309 | Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)), |
309 | 310 | Mod = amqqueue:get_type(Q), |
310 | | - Mod:declare(Q, Node). |
| 311 | + case check_queue_limits(Q) of |
| 312 | + ok -> |
| 313 | + Mod:declare(Q, Node); |
| 314 | + Error -> |
| 315 | + Error |
| 316 | + end. |
311 | 317 |
|
312 | 318 | -spec delete(amqqueue:amqqueue(), boolean(), |
313 | 319 | boolean(), rabbit_types:username()) -> |
@@ -765,3 +771,25 @@ known_queue_type_names() -> |
765 | 771 | {QueueTypes, _} = lists:unzip(Registered), |
766 | 772 | QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes), |
767 | 773 | ?KNOWN_QUEUE_TYPES ++ QTypeBins. |
| 774 | + |
| 775 | +-spec check_queue_limits(amqqueue:amqqueue()) -> |
| 776 | + ok | |
| 777 | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. |
| 778 | +check_queue_limits(Q) -> |
| 779 | + maybe |
| 780 | + %% Prepare for more checks |
| 781 | + ok ?= check_vhost_queue_limit(Q) |
| 782 | + end. |
| 783 | + |
| 784 | +check_vhost_queue_limit(Q) -> |
| 785 | + #resource{name = QueueName} = amqqueue:get_name(Q), |
| 786 | + VHost = amqqueue:get_vhost(Q), |
| 787 | + case rabbit_vhost_limit:is_over_queue_limit(VHost) of |
| 788 | + false -> |
| 789 | + ok; |
| 790 | + {true, Limit} -> |
| 791 | + {protocol_error, precondition_failed, |
| 792 | + "cannot declare queue '~ts': " |
| 793 | + "queue limit in vhost '~ts' (~tp) is reached", |
| 794 | + [QueueName, VHost, Limit]} |
| 795 | + end. |
0 commit comments