Skip to content

Feature/support-reconnect #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0e9c529
added: OctaneRabbitMQQueue::class
adm-bome Mar 15, 2023
99aa452
Merge branch 'fix/refactor-creation-of-connections-and-queues' into f…
adm-bome Mar 15, 2023
1a17d4c
added: reconnect() method
adm-bome Mar 15, 2023
722bc83
cleanup
adm-bome Mar 15, 2023
5edfec5
moved (reconnect) methods to a trait
adm-bome Mar 15, 2023
0b66991
added tests
adm-bome Mar 15, 2023
326cb51
added tests
adm-bome Mar 15, 2023
c3d53db
added tests
adm-bome Mar 16, 2023
947ba6a
throw test
adm-bome Mar 16, 2023
bdbdc42
added tests
adm-bome Mar 15, 2023
c4797f0
Merge branch 'fix/refactor-creation-of-connections-and-queues' into f…
adm-bome Mar 16, 2023
1a6009d
Merge remote-tracking branch 'origin/feature/support-octane-reconnect…
adm-bome Mar 16, 2023
7897f77
removed unnecessary connection config
adm-bome Mar 16, 2023
89f646a
Merge branch 'fix/refactor-creation-of-connections-and-queues' into f…
adm-bome Mar 16, 2023
19b5cbb
updated ReadMe
adm-bome Mar 16, 2023
ce16e57
Revert "removed unnecessary connection config"
adm-bome Mar 16, 2023
eefd94b
really... bump
adm-bome Mar 16, 2023
adab85a
Merge branch 'fix/refactor-creation-of-connections-and-queues' into f…
adm-bome Mar 17, 2023
514d6dc
Merge branch 'fix/refactor-creation-of-connections-and-queues' into f…
adm-bome Mar 28, 2023
14e740f
Merge branch 'fix/refactor-creation-of-connections-and-queues' into f…
adm-bome Mar 28, 2023
8da5b9d
Some rework based on PR #528
adm-bome Mar 28, 2023
7690c9c
Merge branch 'master' into feature/support-octane-reconnect
adm-bome Apr 25, 2023
0d34ba7
rework
adm-bome Apr 25, 2023
dd6ad60
rework 2
adm-bome Apr 25, 2023
ee24fb1
rework
adm-bome Apr 25, 2023
0ca514f
Merge remote-tracking branch 'origin/feature/support-octane-reconnect…
adm-bome Apr 25, 2023
86a1594
update: warm rabbitmq connection reference
adm-bome Apr 26, 2023
826ed95
update: incorrect use statements in examples
adm-bome Apr 26, 2023
e67d4fc
update: reconnect tests
adm-bome Apr 26, 2023
c1e6d2d
update: reconnect tests
adm-bome Apr 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 101 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ by adding extra options.
```

### Horizon support
Starting with 8.0, this package supports [Laravel Horizon](http://horizon.laravel.com) out of the box. Firstly, install
Horizon and then set `RABBITMQ_WORKER` to `horizon`.

Starting with 8.0, this package supports [Laravel Horizon](https://laravel.com/docs/horizon) out of the box. Firstly,
install Horizon and then set `RABBITMQ_WORKER` to `horizon`.

Horizon is depending on events dispatched by the worker.
These events inform Horizon what was done with the message/job.
Expand Down Expand Up @@ -262,7 +263,9 @@ class RabbitMQJob extends BaseJob
}
```

If you want to handle raw message, not in JSON format or without 'job' key in JSON, you should add stub for `getName` method:
If you want to handle raw message, not in JSON format or without 'job' key in JSON,
you should add stub for `getName` method:

```php
<?php

Expand Down Expand Up @@ -310,6 +313,93 @@ An example for the config:
],
```

### Use your own Worker class

If you want to use your own `RabbitMQQueue::class` this is possible by
extending `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`.
and inform laravel to use your class by setting `RABBITMQ_WORKER` to `\App\Queue\RabbitMQQueue::class`.

> Note: Worker classes **must** extend `VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue`

```php
'connections' => [
// ...

'rabbitmq' => [
// ...

/* Set to a class if you wish to use your own. */
'worker' => \App\Queue\RabbitMQQueue::class,
],

// ...
],
```

```php
<?php

namespace App\Queue;

use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{
// ...
}
```

**For Example: A reconnect implementation.**

If you want to reconnect to RabbitMQ, if the connection is dead.
You can override the publishing and the createChannel methods.

> Note: this is not best practice, it is an example.

```php
<?php

namespace App\Queue;

use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{

protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
{
try {
parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
$this->reconnect();
parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
}
}

protected function publishBatch($jobs, $data = '', $queue = null): void
{
try {
parent::publishBatch($jobs, $data, $queue);
} catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
$this->reconnect();
parent::publishBatch($jobs, $data, $queue);
}
}

protected function createChannel(): AMQPChannel
{
try {
return parent::createChannel();
} catch (AMQPConnectionClosedException) {
$this->reconnect();
return parent::createChannel();
}
}
}
```

### Default Queue

The connection does use a default queue with value 'default', when no queue is provided by laravel.
Expand Down Expand Up @@ -419,10 +509,16 @@ If for some reason you don't want the connection lazy you can turn it off by set
],
```

### Octane support

Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box.
Firstly, install Octane and don't forget to warm 'rabbitmq' connection in the octane config.
> See: https://github.com/vyuldashev/laravel-queue-rabbitmq/issues/460#issuecomment-1469851667

## Laravel Usage

Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not need to
change anything else. If you do not know how to use the Queue API, please refer to the official Laravel
Once you completed the configuration you can use the Laravel Queue API. If you used other queue drivers you do not
need to change anything else. If you do not know how to use the Queue API, please refer to the official Laravel
documentation: http://laravel.com/docs/queues

## Lumen Usage
Expand Down
43 changes: 31 additions & 12 deletions src/Queue/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,19 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts =
*/
public function bulk($jobs, $data = '', $queue = null): void
{
foreach ((array) $jobs as $job) {
$this->publishBatch($jobs, $queue, $data);
}

/**
* @throws AMQPProtocolChannelException
*/
protected function publishBatch($jobs, $data = '', $queue = null): void
{
foreach ($jobs as $job) {
$this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]);
}

$this->publishBatch();
$this->batchPublish();
}

/**
Expand Down Expand Up @@ -274,15 +282,6 @@ public function setConnection(AbstractConnection $connection): RabbitMQQueue
return $this;
}

public function getChannel($forceNew = false): AMQPChannel
{
if (! $this->channel || $forceNew) {
$this->channel = $this->createChannel();
}

return $this->channel;
}

/**
* Job class to use.
*
Expand Down Expand Up @@ -740,13 +739,33 @@ protected function publishBasic($msg, $exchange = '', $destination = '', $mandat
$this->getChannel()->basic_publish($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
}

protected function publishBatch(): void
protected function batchPublish(): void
{
$this->getChannel()->publish_batch();
}

public function getChannel($forceNew = false): AMQPChannel
{
if (! $this->channel || $forceNew) {
$this->channel = $this->createChannel();
}

return $this->channel;
}

protected function createChannel(): AMQPChannel
{
return $this->getConnection()->channel();
}

/**
* @throws Exception
*/
protected function reconnect(): void
{
// Reconnects using the original connection settings.
$this->getConnection()->reconnect();
// Create a new main channel because all old channels are removed.
$this->getChannel(true);
}
}
30 changes: 30 additions & 0 deletions tests/Feature/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,41 @@
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Feature;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob;

class QueueTest extends TestCase
{
public function setUp(): void
{
parent::setUp();

$this->withoutExceptionHandling([
AMQPChannelClosedException::class, AMQPConnectionClosedException::class,
AMQPProtocolChannelException::class,
]);
}

public function testConnection(): void
{
$this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection());
}

public function testWithoutReconnect(): void
{
$queue = $this->connection('rabbitmq');

$queue->push(new TestJob());
sleep(1);
$this->assertSame(1, $queue->size());

// close connection
$queue->getConnection()->close();
$this->assertFalse($queue->getConnection()->isConnected());

$this->expectException(AMQPChannelClosedException::class);
$queue->push(new TestJob());
}
}
33 changes: 33 additions & 0 deletions tests/Functional/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional;

use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use ReflectionClass;
use ReflectionException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase;
Expand Down Expand Up @@ -234,4 +235,36 @@ protected function callProperty($object, string $property): mixed

return $property->getValue($object);
}

public function testConnectChannel(): void
{
$queue = $this->connection();
$this->assertFalse($queue->getConnection()->isConnected());

/** @var AMQPChannel $channel */
$channel = $this->callMethod($queue, 'getChannel');
$this->assertTrue($queue->getConnection()->isConnected());
$this->assertSame($channel, $this->callProperty($queue, 'channel'));
$this->assertTrue($channel->is_open());
}

public function testReconnect(): void
{
$queue = $this->connection();
$this->assertFalse($queue->getConnection()->isConnected());

// connect
$channel = $this->callMethod($queue, 'getChannel');
$this->assertTrue($queue->getConnection()->isConnected());
$this->assertSame($channel, $this->callProperty($queue, 'channel'));

// close
$queue->getConnection()->close();
$this->assertFalse($queue->getConnection()->isConnected());

// reconnect
$this->callMethod($queue, 'reconnect');
$this->assertTrue($queue->getConnection()->isConnected());
$this->assertTrue($queue->getChannel()->is_open());
}
}