Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions src/Queue/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ class RabbitMQQueue extends Queue implements QueueContract, RabbitMQQueueContrac
/**
* Holds the Configuration
*/
protected QueueConfig $config;
protected QueueConfig $rabbitMQConfig;

/**
* RabbitMQQueue constructor.
*/
public function __construct(QueueConfig $config)
{
$this->config = $config;
$this->rabbitMQConfig = $config;
$this->dispatchAfterCommit = $config->isDispatchAfterCommit();
}

Expand Down Expand Up @@ -293,7 +293,7 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue
*/
public function getJobClass(): string
{
$job = $this->getConfig()->getAbstractJob();
$job = $this->getRabbitMQConfig()->getAbstractJob();

throw_if(
! is_a($job, RabbitMQJob::class, true),
Expand All @@ -309,7 +309,7 @@ public function getJobClass(): string
*/
public function getQueue($queue = null): string
{
return $queue ?: $this->getConfig()->getQueue();
return $queue ?: $this->getRabbitMQConfig()->getQueue();
}

/**
Expand Down Expand Up @@ -523,7 +523,7 @@ protected function createMessage($payload, int $attempts = 0): array
$properties['correlation_id'] = $correlationId;
}

if ($this->getConfig()->isPrioritizeDelayed()) {
if ($this->getRabbitMQConfig()->isPrioritizeDelayed()) {
$properties['priority'] = $attempts;
}

Expand Down Expand Up @@ -605,16 +605,16 @@ protected function getQueueArguments(string $destination): array
// Messages with a priority which is higher than the queue's maximum, are treated as if they were
// published with the maximum priority.
// Quorum queues does not support priority.
if ($this->getConfig()->isPrioritizeDelayed() && ! $this->getConfig()->isQuorum()) {
$arguments['x-max-priority'] = $this->getConfig()->getQueueMaxPriority();
if ($this->getRabbitMQConfig()->isPrioritizeDelayed() && ! $this->getRabbitMQConfig()->isQuorum()) {
$arguments['x-max-priority'] = $this->getRabbitMQConfig()->getQueueMaxPriority();
}

if ($this->getConfig()->isRerouteFailed()) {
if ($this->getRabbitMQConfig()->isRerouteFailed()) {
$arguments['x-dead-letter-exchange'] = $this->getFailedExchange();
$arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination);
}

if ($this->getConfig()->isQuorum()) {
if ($this->getRabbitMQConfig()->isQuorum()) {
$arguments['x-queue-type'] = 'quorum';
}

Expand All @@ -639,7 +639,7 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array
*/
protected function getExchange(?string $exchange = null): string
{
return $exchange ?? $this->getConfig()->getExchange();
return $exchange ?? $this->getRabbitMQConfig()->getExchange();
}

/**
Expand All @@ -648,15 +648,15 @@ protected function getExchange(?string $exchange = null): string
*/
protected function getRoutingKey(string $destination): string
{
return ltrim(sprintf($this->getConfig()->getExchangeRoutingKey(), $destination), '.');
return ltrim(sprintf($this->getRabbitMQConfig()->getExchangeRoutingKey(), $destination), '.');
}

/**
* Get the exchangeType, or AMQPExchangeType::DIRECT as default.
*/
protected function getExchangeType(?string $type = null): string
{
$constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType());
$constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getRabbitMQConfig()->getExchangeType());

return defined($constant) ? constant($constant) : AMQPExchangeType::DIRECT;
}
Expand All @@ -666,7 +666,7 @@ protected function getExchangeType(?string $type = null): string
*/
protected function getFailedExchange(?string $exchange = null): string
{
return $exchange ?? $this->getConfig()->getFailedExchange();
return $exchange ?? $this->getRabbitMQConfig()->getFailedExchange();
}

/**
Expand All @@ -675,7 +675,7 @@ protected function getFailedExchange(?string $exchange = null): string
*/
protected function getFailedRoutingKey(string $destination): string
{
return ltrim(sprintf($this->getConfig()->getFailedRoutingKey(), $destination), '.');
return ltrim(sprintf($this->getRabbitMQConfig()->getFailedRoutingKey(), $destination), '.');
}

/**
Expand Down Expand Up @@ -735,9 +735,9 @@ protected function publishProperties($queue, array $options = []): array
return [$destination, $exchange, $exchangeType, $attempts];
}

protected function getConfig(): QueueConfig
protected function getRabbitMQConfig(): QueueConfig
{
return $this->config;
return $this->rabbitMQConfig;
}

/**
Expand Down
48 changes: 24 additions & 24 deletions tests/Functional/RabbitMQQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,50 @@ public function test_connection(): void
public function test_config_reroute_failed(): void
{
$queue = $this->connection();
$this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());

$queue = $this->connection('rabbitmq-with-options');
$this->assertTrue($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertFalse($this->callProperty($queue, 'config')->isRerouteFailed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isRerouteFailed());
}

public function test_config_prioritize_delayed(): void
{
$queue = $this->connection();
$this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());

$queue = $this->connection('rabbitmq-with-options');
$this->assertTrue($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertFalse($this->callProperty($queue, 'config')->isPrioritizeDelayed());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isPrioritizeDelayed());
}

public function test_queue_max_priority(): void
{
$queue = $this->connection();
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());

$queue = $this->connection('rabbitmq-with-options');
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(20, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(20, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertIsInt($this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'config')->getQueueMaxPriority());
$this->assertIsInt($this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
$this->assertSame(2, $this->callProperty($queue, 'rabbitMQConfig')->getQueueMaxPriority());
}

public function test_config_exchange_type(): void
Expand All @@ -88,7 +88,7 @@ public function test_config_exchange_type(): void
$this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType'));

// testing an unkown type with a default
$this->callProperty($queue, 'config')->setExchangeType('unknown');
$this->callProperty($queue, 'rabbitMQConfig')->setExchangeType('unknown');
$this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType'));
}

Expand Down Expand Up @@ -161,7 +161,7 @@ public function test_routing_key(): void

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test']));
$this->callProperty($queue, 'config')->setExchangeRoutingKey('.an.alternate.routing-key');
$this->callProperty($queue, 'rabbitMQConfig')->setExchangeRoutingKey('.an.alternate.routing-key');
$this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getRoutingKey', ['test']));
}

Expand All @@ -180,26 +180,26 @@ public function test_failed_routing_key(): void

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test']));
$this->callProperty($queue, 'config')->setFailedRoutingKey('.an.alternate.routing-key');
$this->callProperty($queue, 'rabbitMQConfig')->setFailedRoutingKey('.an.alternate.routing-key');
$this->assertSame('an.alternate.routing-key', $this->callMethod($queue, 'getFailedRoutingKey', ['test']));
}

public function test_config_quorum(): void
{
$queue = $this->connection();
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-options');
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-options-empty');
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-options-null');
$this->assertFalse($this->callProperty($queue, 'config')->isQuorum());
$this->assertFalse($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());

$queue = $this->connection('rabbitmq-with-quorum-options');
$this->assertTrue($this->callProperty($queue, 'config')->isQuorum());
$this->assertTrue($this->callProperty($queue, 'rabbitMQConfig')->isQuorum());
}

public function test_declare_delete_exchange(): void
Expand Down
Loading