Skip to content

Commit c76d395

Browse files
committed
POC for reconnecting octane queues
1 parent d20f624 commit c76d395

File tree

2 files changed

+231
-1
lines changed

2 files changed

+231
-1
lines changed

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PhpAmqpLib\Connection\AMQPLazyConnection;
1515
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners\RabbitMQFailedEvent;
1616
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
17+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\OctaneRabbitMQQueue;
1718
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1819

1920
class RabbitMQConnector implements ConnectorInterface
@@ -102,7 +103,7 @@ protected function createQueue(
102103
) {
103104
switch ($worker) {
104105
case 'default':
105-
return new RabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options);
106+
return new OctaneRabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options);
106107
case 'horizon':
107108
return new HorizonRabbitMQQueue($connection, $queue, $dispatchAfterCommit, $options);
108109
default:

src/Queue/OctaneRabbitMQQueue.php

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
<?php
2+
3+
/** @noinspection PhpRedundantCatchClauseInspection */
4+
5+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
6+
7+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8+
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
9+
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
10+
use PhpAmqpLib\Exchange\AMQPExchangeType;
11+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
12+
13+
class OctaneRabbitMQQueue extends RabbitMQQueue
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*
18+
* @throws AMQPProtocolChannelException
19+
*/
20+
public function size($queue = null): int
21+
{
22+
return $this->withReconnectHandler(fn () => parent::size($queue));
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @throws AMQPProtocolChannelException
29+
*/
30+
public function pushRaw($payload, $queue = null, array $options = [])
31+
{
32+
$this->withReconnectHandler(fn () => parent::pushRaw($payload, $queue, $options));
33+
}
34+
35+
/**
36+
* @param $delay
37+
* @param $payload
38+
* @param null $queue
39+
* @param int $attempts
40+
* @return mixed
41+
*
42+
* @throws AMQPProtocolChannelException
43+
*/
44+
public function laterRaw($delay, $payload, $queue = null, $attempts = 0)
45+
{
46+
return $this->withReconnectHandler(parent::laterRaw($delay, $payload, $queue, $attempts));
47+
}
48+
49+
/**
50+
* {@inheritdoc}
51+
*
52+
* @throws AMQPProtocolChannelException
53+
*/
54+
public function bulk($jobs, $data = '', $queue = null): void
55+
{
56+
$this->withReconnectHandler(parent::bulk($jobs, $data, $queue));
57+
}
58+
59+
/**
60+
* @param string $payload
61+
* @param null $queue
62+
* @param array $options
63+
* @return mixed
64+
*
65+
* @throws AMQPProtocolChannelException
66+
*/
67+
public function bulkRaw(string $payload, $queue = null, array $options = [])
68+
{
69+
return $this->withReconnectHandler(parent::bulkRaw($payload, $queue, $options));
70+
}
71+
72+
/**
73+
* Checks if the given exchange already present/defined in RabbitMQ.
74+
* Returns false when when the exchange is missing.
75+
*
76+
* @param string $exchange
77+
* @return bool
78+
*
79+
* @throws AMQPProtocolChannelException
80+
*/
81+
public function isExchangeExists(string $exchange): bool
82+
{
83+
return $this->withReconnectHandler(fn () => parent::isExchangeExists($exchange));
84+
}
85+
86+
/**
87+
* Declare a exchange in rabbitMQ, when not already declared.
88+
*
89+
* @param string $name
90+
* @param string $type
91+
* @param bool $durable
92+
* @param bool $autoDelete
93+
* @param array $arguments
94+
* @return void
95+
*/
96+
public function declareExchange(
97+
string $name,
98+
string $type = AMQPExchangeType::DIRECT,
99+
bool $durable = true,
100+
bool $autoDelete = false,
101+
array $arguments = []
102+
): void {
103+
$this->withReconnectHandler(fn() => parent::declareExchange($name, $type, $durable, $autoDelete, $arguments));
104+
}
105+
106+
/**
107+
* Delete a exchange from rabbitMQ, only when present in RabbitMQ.
108+
*
109+
* @param string $name
110+
* @param bool $unused
111+
* @return void
112+
*
113+
* @throws AMQPProtocolChannelException
114+
*/
115+
public function deleteExchange(string $name, bool $unused = false): void
116+
{
117+
$this->withReconnectHandler(fn () => parent::deleteExchange($name, $unused));
118+
}
119+
120+
/**
121+
* Checks if the given queue already present/defined in RabbitMQ.
122+
* Returns false when when the queue is missing.
123+
*
124+
* @param string|null $name
125+
* @return bool
126+
*
127+
* @throws AMQPProtocolChannelException
128+
*/
129+
public function isQueueExists(string $name = null): bool
130+
{
131+
return $this->withReconnectHandler(fn () => parent::isQueueExists($name));
132+
}
133+
134+
/**
135+
* Declare a queue in rabbitMQ, when not already declared.
136+
*
137+
* @param string $name
138+
* @param bool $durable
139+
* @param bool $autoDelete
140+
* @param array $arguments
141+
* @return void
142+
*/
143+
public function declareQueue(
144+
string $name,
145+
bool $durable = true,
146+
bool $autoDelete = false,
147+
array $arguments = []
148+
): void {
149+
$this->withReconnectHandler(fn () => parent::declareQueue($name, $durable, $autoDelete, $arguments));
150+
}
151+
152+
/**
153+
* Delete a queue from rabbitMQ, only when present in RabbitMQ.
154+
*
155+
* @param string $name
156+
* @param bool $if_unused
157+
* @param bool $if_empty
158+
* @return void
159+
*
160+
* @throws AMQPProtocolChannelException
161+
*/
162+
public function deleteQueue(string $name, bool $if_unused = false, bool $if_empty = false): void
163+
{
164+
$this->withReconnectHandler(fn () => parent::deleteQueue($name, $if_unused, $if_empty));
165+
}
166+
167+
/**
168+
* Bind a queue to an exchange.
169+
*
170+
* @param string $queue
171+
* @param string $exchange
172+
* @param string $routingKey
173+
* @return void
174+
*/
175+
public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void
176+
{
177+
$this->withReconnectHandler(fn () => parent::bindQueue($queue, $exchange, $routingKey));
178+
}
179+
180+
/**
181+
* Purge the queue of messages.
182+
*
183+
* @param string|null $queue
184+
* @return void
185+
*/
186+
public function purge(string $queue = null): void
187+
{
188+
$this->withReconnectHandler(fn () => parent::purge($queue));
189+
}
190+
191+
/**
192+
* Acknowledge the message.
193+
*
194+
* @param RabbitMQJob $job
195+
* @return void
196+
*/
197+
public function ack(RabbitMQJob $job): void
198+
{
199+
$this->withReconnectHandler(fn () => parent::ack($job));
200+
}
201+
202+
/**
203+
* Reject the message.
204+
*
205+
* @param RabbitMQJob $job
206+
* @param bool $requeue
207+
* @return void
208+
*/
209+
public function reject(RabbitMQJob $job, bool $requeue = false): void
210+
{
211+
$this->withReconnectHandler(fn () => parent::reject($job, $requeue));
212+
}
213+
214+
public function reconnect()
215+
{
216+
$this->connection->reconnect();
217+
$this->channel = $this->connection->channel();
218+
}
219+
220+
public function withReconnectHandler($callback)
221+
{
222+
try {
223+
return $callback();
224+
} catch (AMQPConnectionClosedException|AMQPChannelClosedException|AMQPProtocolChannelException $e) {
225+
$this->reconnect();
226+
return $callback();
227+
}
228+
}
229+
}

0 commit comments

Comments
 (0)