Skip to content

Commit 36958b8

Browse files
Merge pull request #31 from needle-project/develop
Develop
2 parents 0b51938 + 6dd6943 commit 36958b8

File tree

6 files changed

+277
-14
lines changed

6 files changed

+277
-14
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
],
1212
"require": {
1313
"php": "7.*",
14-
"php-amqplib/php-amqplib": "2.7.*"
14+
"php-amqplib/php-amqplib": ">=2.7 <3.0"
1515
},
1616
"require-dev": {
1717
"phpunit/phpunit": "7.*|6.*",

src/Entity/ExchangeEntity.php

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use NeedleProject\LaravelRabbitMq\AMQPConnection;
55
use NeedleProject\LaravelRabbitMq\PublisherInterface;
66
use PhpAmqpLib\Channel\AMQPChannel;
7+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
78
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
89
use PhpAmqpLib\Message\AMQPMessage;
910

@@ -15,6 +16,11 @@
1516
*/
1617
class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
1718
{
19+
/**
20+
* @const int Retry count when a Channel Closed exeption is thrown
21+
*/
22+
const MAX_RETRIES = 3;
23+
1824
/**
1925
* @const array Default connections parameters
2026
*/
@@ -55,6 +61,11 @@ class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
5561
*/
5662
protected $attributes;
5763

64+
/**
65+
* @var int
66+
*/
67+
protected $retryCount = 0;
68+
5869
/**
5970
* ExchangeEntity constructor.
6071
*
@@ -190,11 +201,23 @@ public function publish(string $message, string $routingKey = '')
190201
$this->create();
191202
$this->bind();
192203
}
193-
$this->getChannel()->basic_publish(
194-
new AMQPMessage($message),
195-
$this->attributes['name'],
196-
$routingKey,
197-
true
198-
);
204+
try {
205+
$this->getChannel()->basic_publish(
206+
new AMQPMessage($message),
207+
$this->attributes['name'],
208+
$routingKey,
209+
true
210+
);
211+
$this->retryCount = 0;
212+
} catch (AMQPChannelClosedException $exception) {
213+
$this->retryCount++;
214+
// Retry publishing with re-connect
215+
if ($this->retryCount < self::MAX_RETRIES) {
216+
$this->getConnection()->reconnect();
217+
$this->publish($message, $routingKey);
218+
return;
219+
}
220+
throw $exception;
221+
}
199222
}
200223
}

src/Entity/QueueEntity.php

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use PhpAmqpLib\Message\AMQPMessage;
1313
use Psr\Log\LoggerAwareInterface;
1414
use Psr\Log\LoggerAwareTrait;
15+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
1516

1617
/**
1718
* Class QueueEntity
@@ -23,6 +24,11 @@ class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityIn
2324
{
2425
use LoggerAwareTrait;
2526

27+
/**
28+
* @const int Retry count when a Channel Closed exeption is thrown
29+
*/
30+
const MAX_RETRIES = 3;
31+
2632
/**
2733
* @const array Default connections parameters
2834
*/
@@ -94,6 +100,11 @@ class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityIn
94100
*/
95101
protected $startTime = 0;
96102

103+
/**
104+
* @var int
105+
*/
106+
protected $retryCount = 0;
107+
97108
/**
98109
* @param AMQPConnection $connection
99110
* @param string $aliasName
@@ -246,13 +257,26 @@ public function publish(string $message, string $routingKey = '')
246257
$this->create();
247258
$this->bind();
248259
}
249-
$this->getChannel()
250-
->basic_publish(
251-
new AMQPMessage($message),
252-
'',
253-
$this->attributes['name'],
254-
true
255-
);
260+
261+
try {
262+
$this->getChannel()
263+
->basic_publish(
264+
new AMQPMessage($message),
265+
'',
266+
$this->attributes['name'],
267+
true
268+
);
269+
$this->retryCount = 0;
270+
} catch (AMQPChannelClosedException $exception) {
271+
$this->retryCount++;
272+
// Retry publishing with re-connect
273+
if ($this->retryCount < self::MAX_RETRIES) {
274+
$this->getConnection()->reconnect();
275+
$this->publish($message, $routingKey);
276+
return;
277+
}
278+
throw $exception;
279+
}
256280
}
257281

258282
/**

tests/Entity/ExchangeEntityTest.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use NeedleProject\LaravelRabbitMq\AMQPConnection;
99
use PhpAmqpLib\Channel\AMQPChannel;
1010
use Tests\NeedleProject\LaravelRabbitMq\Stubs\ExchangeEntityDetailsStub;
11+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
1112

1213
class ExchangeEntityTest extends TestCase
1314
{
@@ -354,4 +355,82 @@ public function testPublishWithAutoCreate()
354355
);
355356
$exchange->publish('a');
356357
}
358+
359+
360+
public function testPublishRetry()
361+
{
362+
$amqpConnection = $this->getMockBuilder(AMQPConnection::class)
363+
->disableOriginalConstructor()
364+
->getMock();
365+
366+
$channelMock = $this->getMockBuilder(AMQPChannel::class)
367+
->disableOriginalConstructor()
368+
->getMock();
369+
370+
$amqpConnection->expects($this->atLeastOnce())
371+
->method('getChannel')
372+
->willReturn($channelMock);
373+
374+
$amqpConnection->expects($this->atLeastOnce())
375+
->method('reconnect')
376+
->willReturn($channelMock);
377+
378+
$retries = 0;
379+
$channelMock->expects($this->exactly(2))
380+
->method('basic_publish')
381+
->will($this->returnCallback(function ($args) use (&$retries) {
382+
if (0 === $retries) {
383+
$retries++;
384+
throw new AMQPChannelClosedException("Channel is Closed");
385+
}
386+
return null;
387+
}));
388+
389+
$queue = ExchangeEntity::createExchange(
390+
$amqpConnection,
391+
'foo',
392+
[
393+
'name' => 'exchange.name.on.rabbit',
394+
'auto_create' => true,
395+
'bind' => [['queue' => 'foo', 'routing_key' => '*']]
396+
]
397+
);
398+
$queue->publish('a');
399+
$this->assertEquals(1, $retries);
400+
}
401+
402+
public function testPublishMaxRetry()
403+
{
404+
$amqpConnection = $this->getMockBuilder(AMQPConnection::class)
405+
->disableOriginalConstructor()
406+
->getMock();
407+
408+
$channelMock = $this->getMockBuilder(AMQPChannel::class)
409+
->disableOriginalConstructor()
410+
->getMock();
411+
412+
$amqpConnection->expects($this->atLeastOnce())
413+
->method('getChannel')
414+
->willReturn($channelMock);
415+
416+
$amqpConnection->expects($this->atLeastOnce())
417+
->method('reconnect')
418+
->willReturn($channelMock);
419+
420+
$channelMock->expects($this->exactly(3))
421+
->method('basic_publish')
422+
->will($this->throwException(new AMQPChannelClosedException("Channel is Closed")));
423+
424+
$queue = ExchangeEntity::createExchange(
425+
$amqpConnection,
426+
'foo',
427+
[
428+
'name' => 'exchange.name.on.rabbit',
429+
'auto_create' => true,
430+
'bind' => [['queue' => 'foo', 'routing_key' => '*']]
431+
]
432+
);
433+
$this->expectException(AMQPChannelClosedException::class);
434+
$queue->publish('a');
435+
}
357436
}

tests/Entity/QueueEntityTest.php

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use NeedleProject\LaravelRabbitMq\AMQPConnection;
55
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
66
use PhpAmqpLib\Channel\AMQPChannel;
7+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
78
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
89
use PhpAmqpLib\Message\AMQPMessage;
910
use PHPUnit\Framework\TestCase;
@@ -384,4 +385,77 @@ public function testProcessorCallback()
384385

385386
$queue->consume(new AMQPMessage('FooBar'));
386387
}
388+
389+
public function testPublishRetry()
390+
{
391+
$amqpConnection = $this->getMockBuilder(AMQPConnection::class)
392+
->disableOriginalConstructor()
393+
->getMock();
394+
395+
$channelMock = $this->getMockBuilder(AMQPChannel::class)
396+
->disableOriginalConstructor()
397+
->getMock();
398+
399+
$amqpConnection->expects($this->atLeastOnce())
400+
->method('getChannel')
401+
->willReturn($channelMock);
402+
403+
$amqpConnection->expects($this->atLeastOnce())
404+
->method('reconnect')
405+
->willReturn($channelMock);
406+
407+
$retries = 0;
408+
$channelMock->expects($this->exactly(2))
409+
->method('basic_publish')
410+
->will($this->returnCallback(function ($args) use (&$retries) {
411+
if (0 === $retries) {
412+
$retries++;
413+
throw new AMQPChannelClosedException("Channel is Closed");
414+
}
415+
return null;
416+
}));
417+
418+
$queue = QueueEntity::createQueue(
419+
$amqpConnection,
420+
'foo',
421+
[
422+
'name' => 'queue.name.on.rabbit'
423+
]
424+
);
425+
$queue->publish('a');
426+
$this->assertEquals(1, $retries);
427+
}
428+
429+
public function testPublishMaxRetry()
430+
{
431+
$amqpConnection = $this->getMockBuilder(AMQPConnection::class)
432+
->disableOriginalConstructor()
433+
->getMock();
434+
435+
$channelMock = $this->getMockBuilder(AMQPChannel::class)
436+
->disableOriginalConstructor()
437+
->getMock();
438+
439+
$amqpConnection->expects($this->atLeastOnce())
440+
->method('getChannel')
441+
->willReturn($channelMock);
442+
443+
$amqpConnection->expects($this->atLeastOnce())
444+
->method('reconnect')
445+
->willReturn($channelMock);
446+
447+
$channelMock->expects($this->exactly(3))
448+
->method('basic_publish')
449+
->will($this->throwException(new AMQPChannelClosedException("Channel is Closed")));
450+
451+
$queue = QueueEntity::createQueue(
452+
$amqpConnection,
453+
'foo',
454+
[
455+
'name' => 'queue.name.on.rabbit'
456+
]
457+
);
458+
$this->expectException(AMQPChannelClosedException::class);
459+
$queue->publish('a');
460+
}
387461
}

tests/Processor/AbstractMessageProcessorTest.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,67 @@ public function processMessage(AMQPMessage $message): bool
127127

128128
$this->assertEquals(3, $messageProcessor->getProcessedMessages());
129129
}
130+
131+
public function testErrorAck()
132+
{
133+
$messageProcessor = new class extends AbstractMessageProcessor {
134+
public function processMessage(AMQPMessage $message): bool
135+
{
136+
return true;
137+
}
138+
};
139+
$loggerMock = $this->getMockBuilder(LoggerInterface::class)
140+
->getMock();
141+
$messageProcessor->setLogger($loggerMock);
142+
143+
$channelMock = $this->getMockBuilder(AMQPChannel::class)
144+
->disableOriginalConstructor()
145+
->getMock();
146+
147+
$amqpMessage = $this->getMockBuilder(AMQPMessage::class)
148+
->disableOriginalConstructor()
149+
->getMock();
150+
$amqpMessage->delivery_info['channel'] = $channelMock;
151+
152+
$loggerMock->expects($this->once())
153+
->method('error')
154+
->willReturn(null);
155+
$messageProcessor->consume($amqpMessage);
156+
}
157+
158+
159+
public function testErrorNack()
160+
{
161+
$messageProcessor = new class extends AbstractMessageProcessor {
162+
public function processMessage(AMQPMessage $message): bool
163+
{
164+
// trigger nack
165+
return false;
166+
}
167+
};
168+
$loggerMock = $this->getMockBuilder(LoggerInterface::class)
169+
->getMock();
170+
$messageProcessor->setLogger($loggerMock);
171+
$channelMock = $this->getMockBuilder(AMQPChannel::class)
172+
->disableOriginalConstructor()
173+
->getMock();
174+
$channelMock->expects($this->atLeastOnce())
175+
->method('basic_nack')
176+
->will($this->throwException(new \RuntimeException('FooBar')));
177+
178+
$amqpMessage = $this->getMockBuilder(AMQPMessage::class)
179+
->disableOriginalConstructor()
180+
->getMock();
181+
$amqpMessage->delivery_info['channel'] = $channelMock;
182+
$amqpMessage->delivery_info['delivery_tag'] = 1;
183+
$amqpMessage->expects($this->atLeastOnce())
184+
->method('getBody')
185+
->willReturn('foo');
186+
187+
$loggerMock->expects($this->atLeastOnce())
188+
->method('debug')
189+
->with('Did not processed with success message foo')
190+
->willReturn(null);
191+
$messageProcessor->consume($amqpMessage);
192+
}
130193
}

0 commit comments

Comments
 (0)