22
33namespace VladimirYuldashev \LaravelQueueRabbitMQ \Queue ;
44
5- use Exception ;
65use Illuminate \Contracts \Queue \Queue as QueueContract ;
76use Illuminate \Queue \Queue ;
87use Illuminate \Support \Facades \Config as FacadeConfig ;
@@ -23,25 +22,29 @@ class RabbitMQQueue extends Queue implements QueueContract
2322{
2423 private const SLEEP_ON_ERROR = 5 ;
2524
26- /** @var int */
25+ /** @var int|bool */
2726 protected $ sleepOnError ;
2827
2928 /** @var string */
3029 protected $ queueName ;
3130
31+ /** @var array<string, mixed> */
3232 protected $ queueOptions ;
3333
34+ /** @var array<string, mixed> */
3435 protected $ exchangeOptions ;
3536
3637 /**
3738 * Alternative exchange options to use by producer.
3839 *
39- * @var array|null
40+ * @var array<string, mixed> |null
4041 */
4142 protected $ altExchangeOptions ;
4243
44+ /** @var array<string> */
4345 protected $ declaredExchanges = [];
4446
47+ /** @var array<string> */
4548 protected $ declaredQueues = [];
4649
4750 /**
@@ -52,6 +55,10 @@ class RabbitMQQueue extends Queue implements QueueContract
5255 /** @var string */
5356 protected $ correlationId ;
5457
58+ /**
59+ * @param AmqpContext $context
60+ * @param array<string, mixed> $config
61+ */
5562 public function __construct (AmqpContext $ context , array $ config )
5663 {
5764 $ this ->context = $ context ;
@@ -69,29 +76,33 @@ public function __construct(AmqpContext $context, array $config)
6976 }
7077
7178 /** {@inheritdoc} */
72- public function size ($ queueName = null ): int
79+ public function size ($ queue = null ): int
7380 {
74- /** @var AmqpQueue $queue */
75- [$ queue ] = $ this ->declareEverything ($ queueName );
81+ /** @var AmqpQueue $declaredQueue */
82+ [$ declaredQueue ] = $ this ->declareEverything ($ queue );
7683
77- return $ this ->context ->declareQueue ($ queue );
84+ return $ this ->context ->declareQueue ($ declaredQueue );
7885 }
7986
8087 /** {@inheritdoc} */
8188 public function push ($ job , $ data = '' , $ queue = null ): ?string
8289 {
83- return $ this ->pushRaw ($ this ->createPayload ($ job , $ queue , $ data ), $ queue , []);
90+ return $ this ->pushRaw ($ this ->createPayload ($ job , ( string ) $ queue , $ data ), $ queue , []);
8491 }
8592
86- /** {@inheritdoc} */
87- public function pushRaw ($ payload , $ queueName = null , array $ options = []): ?string
93+ /**
94+ * {@inheritdoc}
95+ *
96+ * @param array<mixed> $options
97+ */
98+ public function pushRaw ($ payload , $ queue = null , array $ options = []): ?string
8899 {
89100 try {
90101 /**
91- * @var AmqpQueue $queue
92- * @var AmqpTopic $topic
102+ * @var AmqpQueue $declaredQueue
103+ * @var AmqpTopic $declaredTopic
93104 */
94- [$ queue , $ topic ] = $ this ->declareEverything ($ queueName );
105+ [$ declaredQueue , $ declaredTopic ] = $ this ->declareEverything ($ queue );
95106
96107 /** @var AmqpMessage $message */
97108 $ message = $ this ->context ->createMessage ($ payload );
@@ -107,7 +118,7 @@ public function pushRaw($payload, $queueName = null, array $options = []): ?stri
107118 if (isset ($ options ['routing_key ' ])) {
108119 $ message ->setRoutingKey ($ options ['routing_key ' ]);
109120 } else {
110- $ message ->setRoutingKey ($ queue ->getQueueName ());
121+ $ message ->setRoutingKey ($ declaredQueue ->getQueueName ());
111122 }
112123
113124 if (isset ($ options ['priority ' ])) {
@@ -147,10 +158,10 @@ public function pushRaw($payload, $queueName = null, array $options = []): ?stri
147158 $ producer ->setDeliveryDelay ($ options ['delay ' ] * 1000 );
148159 }
149160
150- $ producer ->send ($ topic , $ message );
161+ $ producer ->send ($ declaredTopic , $ message );
151162
152163 return $ message ->getCorrelationId ();
153- } catch (Exception $ exception ) {
164+ } catch (Throwable $ exception ) {
154165 $ this ->reportConnectionError ('pushRaw ' , $ exception );
155166
156167 return null ;
@@ -160,7 +171,7 @@ public function pushRaw($payload, $queueName = null, array $options = []): ?stri
160171 /** {@inheritdoc} */
161172 public function later ($ delay , $ job , $ data = '' , $ queue = null ): ?string
162173 {
163- return $ this ->pushRaw ($ this ->createPayload ($ job , $ queue , $ data ), $ queue , ['delay ' => $ this ->secondsUntil ($ delay )]);
174+ return $ this ->pushRaw ($ this ->createPayload ($ job , ( string ) $ queue , $ data ), $ queue , ['delay ' => $ this ->secondsUntil ($ delay )]);
164175 }
165176
166177 /**
@@ -235,11 +246,11 @@ public function getContext(): AmqpContext
235246 /**
236247 * @param string|null $queueName
237248 *
238- * @return array [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
249+ * @return array<AmqpQueue|AmqpTopic> [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
239250 */
240251 protected function declareEverything (?string $ queueName = null ): array
241252 {
242- /** @var array $exchangeOptions */
253+ /** @var array<string, mixed> $exchangeOptions */
243254 $ exchangeOptions = $ this ->altExchangeOptions ?? $ this ->exchangeOptions ;
244255
245256 /** @var string $queueName */
@@ -248,16 +259,16 @@ protected function declareEverything(?string $queueName = null): array
248259 /** @var string $exchangeName */
249260 $ exchangeName = $ exchangeOptions ['name ' ] ?: $ queueName ;
250261
251- /** @var AmqpTopic $topic */
252- $ topic = $ this ->context ->createTopic ($ exchangeName );
262+ /** @var AmqpTopic $declaredTopic */
263+ $ declaredTopic = $ this ->context ->createTopic ($ exchangeName );
253264
254- /** @var AmqpQueue $queue */
255- $ queue = $ this ->context ->createQueue ($ queueName );
265+ /** @var AmqpQueue $declaredQueue */
266+ $ declaredQueue = $ this ->context ->createQueue ($ queueName );
256267
257- $ this ->declareExchange ($ topic , $ queueName , $ exchangeOptions );
258- $ this ->declareQueue ($ queue , $ topic , $ queueName );
268+ $ this ->declareExchange ($ declaredTopic , $ queueName , $ exchangeOptions );
269+ $ this ->declareQueue ($ declaredQueue , $ declaredTopic , $ queueName );
259270
260- return [$ queue , $ topic ];
271+ return [$ declaredQueue , $ declaredTopic ];
261272 }
262273
263274 /**
@@ -271,7 +282,7 @@ protected function declareEverything(?string $queueName = null): array
271282 */
272283 protected function declareQueue (AmqpQueue $ queue , AmqpTopic $ topic , string $ queueName ): void
273284 {
274- /** @var array $queueOptions */
285+ /** @var array<string, mixed> $queueOptions */
275286 $ queueOptions = FacadeConfig::get ('queue.connections.rabbitmq.options ' );
276287
277288 $ queueOptions = $ queueOptions ["queue_ {$ queueName }" ] ?? $ this ->queueOptions ;
@@ -316,7 +327,7 @@ protected function declareQueue(AmqpQueue $queue, AmqpTopic $topic, string $queu
316327 *
317328 * @param AmqpTopic $topic
318329 * @param string $exchangeName
319- * @param array $exchangeOptions
330+ * @param array<string, mixed> $exchangeOptions
320331 *
321332 * @return void
322333 */
@@ -348,11 +359,23 @@ protected function declareExchange(AmqpTopic $topic, string $exchangeName, array
348359 }
349360 }
350361
351- protected function getQueueName ($ queueName = null ): string
362+ /**
363+ * Get queue name.
364+ *
365+ * @param ?string $queueName
366+ *
367+ * @return string
368+ */
369+ protected function getQueueName (?string $ queueName = null ): string
352370 {
353371 return $ queueName ?: $ this ->queueName ;
354372 }
355373
374+ /**
375+ * {@inheritdoc}
376+ *
377+ * @return array<string, mixed>
378+ */
356379 protected function createPayloadArray ($ job , $ queue , $ data = '' ): array
357380 {
358381 return array_merge (parent ::createPayloadArray ($ job , $ queue , $ data ), [
@@ -389,6 +412,6 @@ protected function reportConnectionError(string $action, Throwable $e): void
389412 }
390413
391414 // Sleep so that we don't flood the log file
392- sleep ($ this ->sleepOnError );
415+ sleep (( int ) $ this ->sleepOnError );
393416 }
394417}
0 commit comments