Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 79 additions & 34 deletions src/Queue/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;

use Illuminate\Support\Facades\Config;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use RuntimeException;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue;
use Illuminate\Support\Facades\Config as FacadeConfig;
use Illuminate\Support\Str;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
use Psr\Log\LoggerInterface;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
use Interop\Amqp\Impl\AmqpBind;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use Psr\Log\LoggerInterface;
use RuntimeException;
use Throwable;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;

class RabbitMQQueue extends Queue implements QueueContract
{
Expand All @@ -26,6 +26,13 @@ class RabbitMQQueue extends Queue implements QueueContract
protected $queueOptions;
protected $exchangeOptions;

/**
* Alternative exchange options to use by producer.
*
* @var array|null
*/
protected $altExchangeOptions;

protected $declaredExchanges = [];
protected $declaredQueues = [];

Expand Down Expand Up @@ -217,61 +224,64 @@ public function getContext(): AmqpContext
*/
protected function declareEverything(?string $queueName = null): array
{
/** @var array $exchangeOptions */
$exchangeOptions = $this->altExchangeOptions ?? $this->exchangeOptions;

/** @var string $queueName */
$queueName = $this->getQueueName($queueName);

/** @var string $exchangeName */
$exchangeName = $this->exchangeOptions['name'] ?: $queueName;
$exchangeName = $exchangeOptions['name'] ?: $queueName;

/** @var AmqpTopic $topic */
$topic = $this->context->createTopic($exchangeName);

$topic->setType($this->exchangeOptions['type']);
$topic->setArguments($this->exchangeOptions['arguments']);

if ($this->exchangeOptions['passive']) {
$topic->addFlag(AmqpTopic::FLAG_PASSIVE);
}
if ($this->exchangeOptions['durable']) {
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
}
if ($this->exchangeOptions['auto_delete']) {
$topic->addFlag(AmqpTopic::FLAG_AUTODELETE);
}

if ($this->exchangeOptions['declare'] && ! in_array($exchangeName, $this->declaredExchanges, true)) {
try {
$this->context->declareTopic($topic);
} catch (AMQPChannelClosedException $e) {
throw new AMQPChannelClosedException('Exchange declared with different arguments.', 0, $e);
}
/** @var AmqpQueue $queue */
$queue = $this->context->createQueue($queueName);

$this->declaredExchanges[] = $exchangeName;
}
$this->declareExchange($topic, $queueName, $exchangeOptions);
$this->declareQueue($queue, $topic, $queueName);

$queue = $this->context->createQueue($queueName);
return [$queue, $topic];
}

/**
* Declare the queue.
*
* @param AmqpQueue $queue
* @param AmqpTopic $topic
* @param string $queueName
*
* @return void
*/
protected function declareQueue(AmqpQueue $queue, AmqpTopic $topic, string $queueName): void
{
/** @var array $queueOptions */
$queueOptions = Config::get("queue.connections.rabbitmq.options");
$queueOptions = FacadeConfig::get('queue.connections.rabbitmq.options');

$queueOptions = $queueOptions["queue_{$queueName}"] ?? $this->queueOptions;

if ($queueOptions['arguments']) {
$queue->setArguments(is_string($queueOptions['arguments']) ? json_decode($queueOptions['arguments'], true) : $queueOptions['arguments']);
}

if ($queueOptions['passive']) {
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
}

if ($queueOptions['durable']) {
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
}

if ($queueOptions['exclusive']) {
$queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE);
}

if ($queueOptions['auto_delete']) {
$queue->addFlag(AmqpQueue::FLAG_AUTODELETE);
}

if ($queueOptions['declare'] && ! in_array($queueName, $this->declaredQueues, true)) {
if ($queueOptions['declare'] && !in_array($queueName, $this->declaredQueues, true)) {
try {
$this->context->declareQueue($queue);
} catch (AMQPChannelClosedException $e) {
Expand All @@ -284,8 +294,43 @@ protected function declareEverything(?string $queueName = null): array
if ($queueOptions['bind']) {
$this->context->bind(new AmqpBind($queue, $topic, $queue->getQueueName()));
}
}

return [$queue, $topic];
/**
* Declare the exchange.
*
* @param AmqpTopic $topic
* @param string $exchangeName
* @param array $exchangeOptions
*
* @return void
*/
protected function declareExchange(AmqpTopic $topic, string $exchangeName, array $exchangeOptions): void
{
$topic->setType($exchangeOptions['type']);
$topic->setArguments($exchangeOptions['arguments']);

if ($exchangeOptions['passive']) {
$topic->addFlag(AmqpTopic::FLAG_PASSIVE);
}

if ($exchangeOptions['durable']) {
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
}

if ($exchangeOptions['auto_delete']) {
$topic->addFlag(AmqpTopic::FLAG_AUTODELETE);
}

if ($exchangeOptions['declare'] && !in_array($exchangeName, $this->declaredExchanges, true)) {
try {
$this->context->declareTopic($topic);
} catch (AMQPChannelClosedException $e) {
throw new AMQPChannelClosedException('Exchange declared with different arguments.', 0, $e);
}

$this->declaredExchanges[] = $exchangeName;
}
}

protected function getQueueName($queueName = null): string
Expand Down