Skip to content

Commit d926c2b

Browse files
committed
queue: added option to have alternative exchange config for producer push
1 parent c7bedd5 commit d926c2b

File tree

1 file changed

+79
-34
lines changed

1 file changed

+79
-34
lines changed

src/Queue/RabbitMQQueue.php

Lines changed: 79 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

5-
use Illuminate\Support\Facades\Config;
6-
use PhpAmqpLib\Exception\AMQPChannelClosedException;
7-
use RuntimeException;
5+
use Illuminate\Contracts\Queue\Queue as QueueContract;
86
use Illuminate\Queue\Queue;
7+
use Illuminate\Support\Facades\Config as FacadeConfig;
98
use Illuminate\Support\Str;
109
use Interop\Amqp\AmqpConsumer;
11-
use Interop\Amqp\AmqpQueue;
12-
use Interop\Amqp\AmqpTopic;
13-
use Psr\Log\LoggerInterface;
1410
use Interop\Amqp\AmqpContext;
1511
use Interop\Amqp\AmqpMessage;
12+
use Interop\Amqp\AmqpQueue;
13+
use Interop\Amqp\AmqpTopic;
1614
use Interop\Amqp\Impl\AmqpBind;
17-
use Illuminate\Contracts\Queue\Queue as QueueContract;
18-
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
15+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
16+
use Psr\Log\LoggerInterface;
17+
use RuntimeException;
1918
use Throwable;
19+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
2020

2121
class RabbitMQQueue extends Queue implements QueueContract
2222
{
@@ -26,6 +26,13 @@ class RabbitMQQueue extends Queue implements QueueContract
2626
protected $queueOptions;
2727
protected $exchangeOptions;
2828

29+
/**
30+
* Alternative exchange options to use by producer.
31+
*
32+
* @var array|null
33+
*/
34+
protected $altExchangeOptions;
35+
2936
protected $declaredExchanges = [];
3037
protected $declaredQueues = [];
3138

@@ -217,61 +224,64 @@ public function getContext(): AmqpContext
217224
*/
218225
protected function declareEverything(?string $queueName = null): array
219226
{
227+
/** @var array $exchangeOptions */
228+
$exchangeOptions = $this->altExchangeOptions ?? $this->exchangeOptions;
229+
220230
/** @var string $queueName */
221231
$queueName = $this->getQueueName($queueName);
222232

223233
/** @var string $exchangeName */
224-
$exchangeName = $this->exchangeOptions['name'] ?: $queueName;
234+
$exchangeName = $exchangeOptions['name'] ?: $queueName;
225235

236+
/** @var AmqpTopic $topic */
226237
$topic = $this->context->createTopic($exchangeName);
227238

228-
$topic->setType($this->exchangeOptions['type']);
229-
$topic->setArguments($this->exchangeOptions['arguments']);
230-
231-
if ($this->exchangeOptions['passive']) {
232-
$topic->addFlag(AmqpTopic::FLAG_PASSIVE);
233-
}
234-
if ($this->exchangeOptions['durable']) {
235-
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
236-
}
237-
if ($this->exchangeOptions['auto_delete']) {
238-
$topic->addFlag(AmqpTopic::FLAG_AUTODELETE);
239-
}
240-
241-
if ($this->exchangeOptions['declare'] && ! in_array($exchangeName, $this->declaredExchanges, true)) {
242-
try {
243-
$this->context->declareTopic($topic);
244-
} catch (AMQPChannelClosedException $e) {
245-
throw new AMQPChannelClosedException('Exchange declared with different arguments.', 0, $e);
246-
}
239+
/** @var AmqpQueue $queue */
240+
$queue = $this->context->createQueue($queueName);
247241

248-
$this->declaredExchanges[] = $exchangeName;
249-
}
242+
$this->declareExchange($topic, $queueName, $exchangeOptions);
243+
$this->declareQueue($queue, $topic, $queueName);
250244

251-
$queue = $this->context->createQueue($queueName);
245+
return [$queue, $topic];
246+
}
252247

248+
/**
249+
* Declare the queue.
250+
*
251+
* @param AmqpQueue $queue
252+
* @param AmqpTopic $topic
253+
* @param string $queueName
254+
*
255+
* @return void
256+
*/
257+
protected function declareQueue(AmqpQueue $queue, AmqpTopic $topic, string $queueName): void
258+
{
253259
/** @var array $queueOptions */
254-
$queueOptions = Config::get("queue.connections.rabbitmq.options");
260+
$queueOptions = FacadeConfig::get('queue.connections.rabbitmq.options');
255261

256262
$queueOptions = $queueOptions["queue_{$queueName}"] ?? $this->queueOptions;
257263

258264
if ($queueOptions['arguments']) {
259265
$queue->setArguments(is_string($queueOptions['arguments']) ? json_decode($queueOptions['arguments'], true) : $queueOptions['arguments']);
260266
}
267+
261268
if ($queueOptions['passive']) {
262269
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
263270
}
271+
264272
if ($queueOptions['durable']) {
265273
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
266274
}
275+
267276
if ($queueOptions['exclusive']) {
268277
$queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE);
269278
}
279+
270280
if ($queueOptions['auto_delete']) {
271281
$queue->addFlag(AmqpQueue::FLAG_AUTODELETE);
272282
}
273283

274-
if ($queueOptions['declare'] && ! in_array($queueName, $this->declaredQueues, true)) {
284+
if ($queueOptions['declare'] && !in_array($queueName, $this->declaredQueues, true)) {
275285
try {
276286
$this->context->declareQueue($queue);
277287
} catch (AMQPChannelClosedException $e) {
@@ -284,8 +294,43 @@ protected function declareEverything(?string $queueName = null): array
284294
if ($queueOptions['bind']) {
285295
$this->context->bind(new AmqpBind($queue, $topic, $queue->getQueueName()));
286296
}
297+
}
287298

288-
return [$queue, $topic];
299+
/**
300+
* Declare the exchange.
301+
*
302+
* @param AmqpTopic $topic
303+
* @param string $exchangeName
304+
* @param array $exchangeOptions
305+
*
306+
* @return void
307+
*/
308+
protected function declareExchange(AmqpTopic $topic, string $exchangeName, array $exchangeOptions): void
309+
{
310+
$topic->setType($exchangeOptions['type']);
311+
$topic->setArguments($exchangeOptions['arguments']);
312+
313+
if ($exchangeOptions['passive']) {
314+
$topic->addFlag(AmqpTopic::FLAG_PASSIVE);
315+
}
316+
317+
if ($exchangeOptions['durable']) {
318+
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
319+
}
320+
321+
if ($exchangeOptions['auto_delete']) {
322+
$topic->addFlag(AmqpTopic::FLAG_AUTODELETE);
323+
}
324+
325+
if ($exchangeOptions['declare'] && !in_array($exchangeName, $this->declaredExchanges, true)) {
326+
try {
327+
$this->context->declareTopic($topic);
328+
} catch (AMQPChannelClosedException $e) {
329+
throw new AMQPChannelClosedException('Exchange declared with different arguments.', 0, $e);
330+
}
331+
332+
$this->declaredExchanges[] = $exchangeName;
333+
}
289334
}
290335

291336
protected function getQueueName($queueName = null): string

0 commit comments

Comments
 (0)