Skip to content

Commit

Permalink
Add queue name to received message.
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhukV committed May 21, 2024
1 parent ef57c1f commit 17ecf93
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 17 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
CHANGELOG
=========

Next release
------------

* Add queue name to `ReceivedMessage`.
* Add method `ReceivedMessage::isDirectPublisher` for check, is message direct published to queue (via default exchange).

v2.1.0
------

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"ext-sockets": "*",
"phpunit/phpunit": "~10.1",
"phpmetrics/phpmetrics": "~2.7",
"phpstan/phpstan": "~1.10.0",
"phpstan/phpstan": "~1.11.0",
"escapestudios/symfony2-coding-standard": "~3.5",
"guzzlehttp/guzzle": "~6.5.6",
"symfony/console": "~5.4 | ~6.0",
Expand Down
1 change: 1 addition & 0 deletions src/Adapter/Amqp/Message/AmqpReceivedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public function __construct(
parent::__construct(
$payload,
(int) $this->envelope->getDeliveryTag(),
$this->queue->getName(),
$this->envelope->getRoutingKey(),
$this->envelope->getExchangeName(),
new Options(
Expand Down
4 changes: 3 additions & 1 deletion src/Adapter/AmqpLib/Message/AmqpReceivedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class AmqpReceivedMessage extends ReceivedMessage
* Constructor.
*
* @param AMQPMessage $message
* @param string $queueName
*/
public function __construct(private readonly AMQPMessage $message)
public function __construct(private readonly AMQPMessage $message, string $queueName)
{
$payload = new Payload(
$this->message->getBody(),
Expand All @@ -49,6 +50,7 @@ public function __construct(private readonly AMQPMessage $message)
parent::__construct(
$payload,
$this->message->getDeliveryTag(),
$queueName,
(string) $this->message->getRoutingKey(),
(string) $this->message->getExchange(),
new Options(
Expand Down
10 changes: 6 additions & 4 deletions src/Adapter/AmqpLib/Queue/AmqpQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@ public function consume(\Closure $handler, string $tag = ''): void
{
$amqplibChannel = $this->channel->getChannel();

$queueName = $this->getName();

try {
$amqplibChannel->basic_consume(
$this->getName(),
$queueName,
$tag,
false,
false,
$this->definition->exclusive,
false,
function (AMQPMessage $message) use ($handler) {
$receivedMessage = new AmqpReceivedMessage($message);
function (AMQPMessage $message) use ($handler, $queueName) {
$receivedMessage = new AmqpReceivedMessage($message, $queueName);

$handler($receivedMessage);
}
Expand Down Expand Up @@ -100,7 +102,7 @@ public function get(): ?ReceivedMessage
$message = $this->channel->getChannel()->basic_get($this->getName());

if ($message) {
return new AmqpReceivedMessage($message);
return new AmqpReceivedMessage($message, $this->getName());
}

return null;
Expand Down
24 changes: 18 additions & 6 deletions src/Message/ReceivedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,36 @@ abstract class ReceivedMessage extends Message
*
* @param Payload $payload
* @param int $deliveryTag
* @param string $queueName
* @param string $routingKey
* @param string $exchangeName
* @param Options|null $options
* @param Headers|null $headers
* @param Identifier|null $identifier
*/
public function __construct(
Payload $payload,
public readonly int $deliveryTag,
Payload $payload,
public readonly int $deliveryTag,
public readonly string $queueName,
public readonly string $routingKey,
public readonly string $exchangeName,
Options $options = null,
Headers $headers = null,
Identifier $identifier = null
public readonly string $exchangeName,
Options $options = null,
Headers $headers = null,
Identifier $identifier = null
) {
parent::__construct($payload, $options, $headers, $identifier);
}

/**
* Is direct published?
*
* @return bool
*/
public function isDirectPublished(): bool
{
return $this->exchangeName === '' && $this->routingKey === $this->queueName;
}

/**
* Is answered?
*
Expand Down
24 changes: 24 additions & 0 deletions tests/Unit/Adapter/Amqp/Message/AmqpReceivedMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ class AmqpReceivedMessageTest extends TestCase
protected function setUp(): void
{
$this->queue = $this->createMock(\AMQPQueue::class);

$this->queue->expects(self::any())
->method('getName')
->willReturn('test_queue');
}

#[Test]
public function shouldSuccessCheckIsDirectPublished(): void
{
$receivedMessage = $this->makeReceivedMessage(routingKey: 'test_queue', exchangeName: '');

self::assertTrue($receivedMessage->isDirectPublished());

$receivedMessage = $this->makeReceivedMessage(routingKey: 'test_queue', exchangeName: 'bla');

self::assertFalse($receivedMessage->isDirectPublished());
}

#[Test]
public function shouldSuccessGetQueueName(): void
{
$receivedMessage = $this->makeReceivedMessage();

self::assertEquals('test_queue', $receivedMessage->queueName);
}

#[Test]
Expand Down
4 changes: 4 additions & 0 deletions tests/Unit/Adapter/Amqp/Queue/AmqpQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public function shouldSuccessGetMessage(): void
->method('get')
->willReturn($envelope);

$this->originQueue->expects(self::any())
->method('getName')
->willReturn('bla-bla');

$message = $this->queue->get();

self::assertEquals(new AmqpReceivedMessage($this->originQueue, $envelope), $message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected function setUp(): void
#[TestWith(['foo', false])]
public function shouldSuccessSupports(string $routing, bool $supports): void
{
$message = new ReceivedMessageStub(new Payload(''), 0, '', '', null, new Headers([
$message = new ReceivedMessageStub(new Payload(''), 0, '', '', '', null, new Headers([
'x-death' => [
[
'routing-keys' => [$routing],
Expand All @@ -79,6 +79,7 @@ public function shouldSuccessPublishToTarget(): void
0,
'',
'',
'',
new Options(false),
new Headers([
'x-delay-publisher' => 'processing',
Expand Down Expand Up @@ -119,6 +120,7 @@ public function shouldSuccessRetryPublishToDelay(): void
0,
'',
'',
'',
null,
new Headers([
'x-delay-publisher' => 'processing',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ private function createMessage(string $routingKey = null, int $deliveryTag = nul
return new ReceivedMessageStub(
$payload ?: new Payload(''),
$deliveryTag,
'',
$routingKey,
'exchange-name'
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected function setUp(): void
#[Test]
public function shouldSuccessProxy(): void
{
$message = new ReceivedMessageStub(new Payload('data'), 0, 'some', 'some');
$message = new ReceivedMessageStub(new Payload('data'), 0, '', 'some', 'some');

$this->exchange->expects(self::once())
->method('publish')
Expand All @@ -89,7 +89,7 @@ public function shouldThrowExceptionIfWeTryToProxyToSameExchange(): void
$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Loop detection. You try to proxy message from "to-another" exchange to "to-another" exchange by same routing key.');

$message = new ReceivedMessageStub(new Payload('data'), 0, 'foo', 'to-another');
$message = new ReceivedMessageStub(new Payload('data'), 0, '', 'foo', 'to-another');

$this->exchange->expects(self::never())
->method('publish');
Expand Down
5 changes: 3 additions & 2 deletions tests/Unit/Message/ReceivedMessageStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ class ReceivedMessageStub extends ReceivedMessage
*
* @param Payload $payload
* @param int|null $deliveryTag
* @param string $queueName
* @param string|null $routingKey
* @param string $exchangeName
* @param Options|null $options
* @param Headers|null $headers
* @param Identifier|null $identifier
*/
public function __construct(Payload $payload, ?int $deliveryTag, ?string $routingKey, string $exchangeName, Options $options = null, Headers $headers = null, Identifier $identifier = null)
public function __construct(Payload $payload, ?int $deliveryTag, string $queueName, ?string $routingKey, string $exchangeName, Options $options = null, Headers $headers = null, Identifier $identifier = null)
{
$options = $options ?: new Options(false);
$deliveryTag = $deliveryTag ?: 0;
$routingKey = $routingKey ?: '';

parent::__construct($payload, $deliveryTag, $routingKey, $exchangeName, $options, $headers, $identifier);
parent::__construct($payload, $deliveryTag, $queueName, $routingKey, $exchangeName, $options, $headers, $identifier);
}

/**
Expand Down

0 comments on commit 17ecf93

Please sign in to comment.