Skip to content

Commit 3c86da6

Browse files
authored
Merge pull request vyuldashev#531 from vyuldashev/feature/support-octane-reconnect
Feature/support-reconnect
2 parents d174b95 + c1e6d2d commit 3c86da6

File tree

4 files changed

+195
-17
lines changed

4 files changed

+195
-17
lines changed

README.md

Lines changed: 101 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,9 @@ by adding extra options.
149149
```
150150

151151
### Horizon support
152-
Starting with 8.0, this package supports [Laravel Horizon](http://horizon.laravel.com) out of the box. Firstly, install
153-
Horizon and then set `RABBITMQ_WORKER` to `horizon`.
152+
153+
Starting with 8.0, this package supports [Laravel Horizon](https://laravel.com/docs/horizon) out of the box. Firstly,
154+
install Horizon and then set `RABBITMQ_WORKER` to `horizon`.
154155

155156
Horizon is depending on events dispatched by the worker.
156157
These events inform Horizon what was done with the message/job.
@@ -262,7 +263,9 @@ class RabbitMQJob extends BaseJob
262263
}
263264
```
264265

265-
If you want to handle raw message, not in JSON format or without 'job' key in JSON, you should add stub for `getName` method:
266+
If you want to handle raw message, not in JSON format or without 'job' key in JSON,
267+
you should add stub for `getName` method:
268+
266269
```php
267270
<?php
268271

@@ -310,6 +313,93 @@ An example for the config:
310313
],
311314
```
312315

316+
### Use your own Worker class
317+
318+
If you want to use your own `RabbitMQQueue::class` this is possible by
319+
extending `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`.
320+
and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue\RabbitMQQueue::class`.
321+
322+
> Note: Worker classes **must** extend `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`
323+
324+
```php
325+
'connections' => [
326+
// ...
327+
328+
'rabbitmq' => [
329+
// ...
330+
331+
/* Set to a class if you wish to use your own. */
332+
'worker' => \App\Queue\RabbitMQQueue::class,
333+
],
334+
335+
// ...
336+
],
337+
```
338+
339+
```php
340+
<?php
341+
342+
namespace App\Queue;
343+
344+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;
345+
346+
class RabbitMQQueue extends BaseRabbitMQQueue
347+
{
348+
// ...
349+
}
350+
```
351+
352+
**For Example: A reconnect implementation.**
353+
354+
If you want to reconnect to RabbitMQ, if the connection is dead.
355+
You can override the publishing and the createChannel methods.
356+
357+
> Note: this is not best practice, it is an example.
358+
359+
```php
360+
<?php
361+
362+
namespace App\Queue;
363+
364+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
365+
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
366+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;
367+
368+
class RabbitMQQueue extends BaseRabbitMQQueue
369+
{
370+
371+
protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
372+
{
373+
try {
374+
parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
375+
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
376+
$this->reconnect();
377+
parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
378+
}
379+
}
380+
381+
protected function publishBatch($jobs, $data = '', $queue = null): void
382+
{
383+
try {
384+
parent::publishBatch($jobs, $data, $queue);
385+
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
386+
$this->reconnect();
387+
parent::publishBatch($jobs, $data, $queue);
388+
}
389+
}
390+
391+
protected function createChannel(): AMQPChannel
392+
{
393+
try {
394+
return parent::createChannel();
395+
} catch (AMQPConnectionClosedException) {
396+
$this->reconnect();
397+
return parent::createChannel();
398+
}
399+
}
400+
}
401+
```
402+
313403
### Default Queue
314404

315405
The connection does use a default queue with value 'default', when no queue is provided by laravel.
@@ -419,10 +509,16 @@ If for some reason you don't want the connection lazy you can turn it off by set
419509
],
420510
```
421511

512+
### Octane support
513+
514+
Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box.
515+
Firstly, install Octane and don't forget to warm 'rabbitmq' connection in the octane config.
516+
> See: https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/460#issuecomment-1469851667
517+
422518
## Laravel Usage
423519

424-
Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not need to
425-
change anything else. If you do not know how to use the Queue API, please refer to the official Laravel
520+
Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not
521+
need to change anything else. If you do not know how to use the Queue API, please refer to the official Laravel
426522
documentation: http://laravel.com/docs/queues
427523

428524
## Lumen Usage

src/Queue/RabbitMQQueue.php

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,19 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts =
183183
*/
184184
public function bulk($jobs, $data = '', $queue = null): void
185185
{
186-
foreach ((array) $jobs as $job) {
186+
$this->publishBatch($jobs, $queue, $data);
187+
}
188+
189+
/**
190+
* @throws AMQPProtocolChannelException
191+
*/
192+
protected function publishBatch($jobs, $data = '', $queue = null): void
193+
{
194+
foreach ($jobs as $job) {
187195
$this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]);
188196
}
189197

190-
$this->publishBatch();
198+
$this->batchPublish();
191199
}
192200

193201
/**
@@ -274,15 +282,6 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue
274282
return $this;
275283
}
276284

277-
public function getChannel($forceNew = false): AMQPChannel
278-
{
279-
if (! $this->channel || $forceNew) {
280-
$this->channel = $this->createChannel();
281-
}
282-
283-
return $this->channel;
284-
}
285-
286285
/**
287286
* Job class to use.
288287
*
@@ -740,13 +739,33 @@ protected function publishBasic($msg, $exchange = '', $destination = '', $mandat
740739
$this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
741740
}
742741

743-
protected function publishBatch(): void
742+
protected function batchPublish(): void
744743
{
745744
$this->getChannel()->publish_batch();
746745
}
747746

747+
public function getChannel($forceNew = false): AMQPChannel
748+
{
749+
if (! $this->channel || $forceNew) {
750+
$this->channel = $this->createChannel();
751+
}
752+
753+
return $this->channel;
754+
}
755+
748756
protected function createChannel(): AMQPChannel
749757
{
750758
return $this->getConnection()->channel();
751759
}
760+
761+
/**
762+
* @throws Exception
763+
*/
764+
protected function reconnect(): void
765+
{
766+
// Reconnects using the original connection settings.
767+
$this->getConnection()->reconnect();
768+
// Create a new main channel because all old channels are removed.
769+
$this->getChannel(true);
770+
}
752771
}

tests/Feature/QueueTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,41 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature;
44

55
use PhpAmqpLib\Connection\AMQPStreamConnection;
6+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
7+
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
8+
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
9+
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob;
610

711
class QueueTest extends TestCase
812
{
13+
public function setUp(): void
14+
{
15+
parent::setUp();
16+
17+
$this->withoutExceptionHandling([
18+
AMQPChannelClosedException::class, AMQPConnectionClosedException::class,
19+
AMQPProtocolChannelException::class,
20+
]);
21+
}
22+
923
public function testConnection(): void
1024
{
1125
$this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection());
1226
}
27+
28+
public function testWithoutReconnect(): void
29+
{
30+
$queue = $this->connection('rabbitmq');
31+
32+
$queue->push(new TestJob());
33+
sleep(1);
34+
$this->assertSame(1, $queue->size());
35+
36+
// close connection
37+
$queue->getConnection()->close();
38+
$this->assertFalse($queue->getConnection()->isConnected());
39+
40+
$this->expectException(AMQPChannelClosedException::class);
41+
$queue->push(new TestJob());
42+
}
1343
}

tests/Functional/TestCase.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional;
44

55
use Exception;
6+
use PhpAmqpLib\Channel\AMQPChannel;
67
use ReflectionClass;
78
use ReflectionException;
89
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase;
@@ -234,4 +235,36 @@ protected function callProperty($object, string $property): mixed
234235

235236
return $property->getValue($object);
236237
}
238+
239+
public function testConnectChannel(): void
240+
{
241+
$queue = $this->connection();
242+
$this->assertFalse($queue->getConnection()->isConnected());
243+
244+
/** @var AMQPChannel $channel */
245+
$channel = $this->callMethod($queue, 'getChannel');
246+
$this->assertTrue($queue->getConnection()->isConnected());
247+
$this->assertSame($channel, $this->callProperty($queue, 'channel'));
248+
$this->assertTrue($channel->is_open());
249+
}
250+
251+
public function testReconnect(): void
252+
{
253+
$queue = $this->connection();
254+
$this->assertFalse($queue->getConnection()->isConnected());
255+
256+
// connect
257+
$channel = $this->callMethod($queue, 'getChannel');
258+
$this->assertTrue($queue->getConnection()->isConnected());
259+
$this->assertSame($channel, $this->callProperty($queue, 'channel'));
260+
261+
// close
262+
$queue->getConnection()->close();
263+
$this->assertFalse($queue->getConnection()->isConnected());
264+
265+
// reconnect
266+
$this->callMethod($queue, 'reconnect');
267+
$this->assertTrue($queue->getConnection()->isConnected());
268+
$this->assertTrue($queue->getChannel()->is_open());
269+
}
237270
}

0 commit comments

Comments
 (0)