Skip to content

Commit e8e5f5a

Browse files
committed
Merge branch 'custom_worker_patch' into custom_worker_patch_6
# Conflicts: # composer.json # composer.lock
2 parents 2a0459c + ce46fb8 commit e8e5f5a

File tree

1 file changed

+45
-20
lines changed

1 file changed

+45
-20
lines changed

src/Queue/RabbitMQQueue.php

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
44

5+
use Illuminate\Support\Facades\Config;
6+
use PhpAmqpLib\Exception\AMQPChannelClosedException;
57
use RuntimeException;
68
use Illuminate\Queue\Queue;
79
use Illuminate\Support\Str;
@@ -13,6 +15,7 @@
1315
use Interop\Amqp\Impl\AmqpBind;
1416
use Illuminate\Contracts\Queue\Queue as QueueContract;
1517
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
18+
use Throwable;
1619

1720
class RabbitMQQueue extends Queue implements QueueContract
1821
{
@@ -57,13 +60,13 @@ public function size($queueName = null): int
5760
}
5861

5962
/** {@inheritdoc} */
60-
public function push($job, $data = '', $queue = null)
63+
public function push($job, $data = '', $queue = null): ?string
6164
{
6265
return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, []);
6366
}
6467

6568
/** {@inheritdoc} */
66-
public function pushRaw($payload, $queueName = null, array $options = [])
69+
public function pushRaw($payload, $queueName = null, array $options = []): ?string
6770
{
6871
try {
6972
/**
@@ -128,12 +131,12 @@ public function pushRaw($payload, $queueName = null, array $options = [])
128131
} catch (\Exception $exception) {
129132
$this->reportConnectionError('pushRaw', $exception);
130133

131-
return;
134+
return null;
132135
}
133136
}
134137

135138
/** {@inheritdoc} */
136-
public function later($delay, $job, $data = '', $queue = null)
139+
public function later($delay, $job, $data = '', $queue = null): ?string
137140
{
138141
return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, ['delay' => $this->secondsUntil($delay)]);
139142
}
@@ -206,18 +209,23 @@ public function getContext(): AmqpContext
206209
}
207210

208211
/**
209-
* @param string $queueName
212+
* @param string|null $queueName
210213
*
211214
* @return array [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
212215
*/
213-
protected function declareEverything(string $queueName = null): array
216+
protected function declareEverything(?string $queueName = null): array
214217
{
218+
/** @var string $queueName */
215219
$queueName = $this->getQueueName($queueName);
220+
221+
/** @var string $exchangeName */
216222
$exchangeName = $this->exchangeOptions['name'] ?: $queueName;
217223

218224
$topic = $this->context->createTopic($exchangeName);
225+
219226
$topic->setType($this->exchangeOptions['type']);
220227
$topic->setArguments($this->exchangeOptions['arguments']);
228+
221229
if ($this->exchangeOptions['passive']) {
222230
$topic->addFlag(AmqpTopic::FLAG_PASSIVE);
223231
}
@@ -229,45 +237,61 @@ protected function declareEverything(string $queueName = null): array
229237
}
230238

231239
if ($this->exchangeOptions['declare'] && ! in_array($exchangeName, $this->declaredExchanges, true)) {
232-
$this->context->declareTopic($topic);
240+
try {
241+
$this->context->declareTopic($topic);
242+
} catch (AMQPChannelClosedException $e) {
243+
throw new AMQPChannelClosedException('Exchange declared with different arguments.', 0, $e);
244+
}
233245

234246
$this->declaredExchanges[] = $exchangeName;
235247
}
236248

237249
$queue = $this->context->createQueue($queueName);
238-
$queue->setArguments($this->queueOptions['arguments']);
239-
if ($this->queueOptions['passive']) {
250+
251+
/** @var array $queueOptions */
252+
$queueOptions = Config::get("queue.connections.rabbitmq.options");
253+
254+
$queueOptions = $queueOptions["queue_{$queueName}"] ?? $this->queueOptions;
255+
256+
if ($queueOptions['arguments']) {
257+
$queue->setArguments(is_string($queueOptions['arguments']) ? json_decode($queueOptions['arguments'], true) : $queueOptions['arguments']);
258+
}
259+
if ($queueOptions['passive']) {
240260
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
241261
}
242-
if ($this->queueOptions['durable']) {
262+
if ($queueOptions['durable']) {
243263
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
244264
}
245-
if ($this->queueOptions['exclusive']) {
265+
if ($queueOptions['exclusive']) {
246266
$queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE);
247267
}
248-
if ($this->queueOptions['auto_delete']) {
268+
if ($queueOptions['auto_delete']) {
249269
$queue->addFlag(AmqpQueue::FLAG_AUTODELETE);
250270
}
251271

252-
if ($this->queueOptions['declare'] && ! in_array($queueName, $this->declaredQueues, true)) {
253-
$this->context->declareQueue($queue);
272+
if ($queueOptions['declare'] && ! in_array($queueName, $this->declaredQueues, true)) {
273+
try {
274+
$this->context->declareQueue($queue);
275+
} catch (AMQPChannelClosedException $e) {
276+
throw new AMQPChannelClosedException('Queue declared with different arguments.', 0, $e);
277+
}
254278

255279
$this->declaredQueues[] = $queueName;
256280
}
257281

258-
if ($this->queueOptions['bind']) {
282+
if ($queueOptions['bind']) {
259283
$this->context->bind(new AmqpBind($queue, $topic, $queue->getQueueName()));
260284
}
261285

262286
return [$queue, $topic];
263287
}
264288

265-
protected function getQueueName($queueName = null)
289+
protected function getQueueName($queueName = null): string
266290
{
267291
return $queueName ?: $this->queueName;
268292
}
269293

270-
protected function createPayloadArray($job, $queue, $data = '')
294+
protected function createPayloadArray($job, $queue, $data = ''): array
271295
{
272296
return array_merge(parent::createPayloadArray($job, $queue, $data), [
273297
'id' => $this->getRandomId(),
@@ -286,10 +310,11 @@ protected function getRandomId(): string
286310

287311
/**
288312
* @param string $action
289-
* @param \Throwable $e
290-
* @throws \Exception
313+
* @param Throwable $e
314+
*
315+
* @throws RuntimeException
291316
*/
292-
protected function reportConnectionError($action, \Throwable $e)
317+
protected function reportConnectionError(string $action, Throwable $e): void
293318
{
294319
/** @var LoggerInterface $logger */
295320
$logger = $this->container['log'];

0 commit comments

Comments
 (0)