Skip to content

Commit 7099c33

Browse files
authored
Merge pull request vyuldashev#231 from adm-bome/fix-for-horizon-failed-rabbitmq-jobs
Added extra listener when horizon is used as worker.
2 parents 17a889c + b6a5ab6 commit 7099c33

File tree

2 files changed

+50
-0
lines changed

2 files changed

+50
-0
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners;
4+
5+
use Illuminate\Contracts\Events\Dispatcher;
6+
use Laravel\Horizon\Events\JobFailed as HorizonJobFailed;
7+
use Illuminate\Queue\Events\JobFailed as LaravelJobFailed;
8+
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
9+
10+
class RabbitMQFailedEvent
11+
{
12+
/**
13+
* The event dispatcher implementation.
14+
*
15+
* @var \Illuminate\Contracts\Events\Dispatcher
16+
*/
17+
public $events;
18+
19+
/**
20+
* Create a new listener instance.
21+
*
22+
* @param \Illuminate\Contracts\Events\Dispatcher $events
23+
* @return void
24+
*/
25+
public function __construct(Dispatcher $events)
26+
{
27+
$this->events = $events;
28+
}
29+
30+
/**
31+
* Handle the event.
32+
*
33+
* @param \Illuminate\Queue\Events\JobFailed $event
34+
* @return void
35+
*/
36+
public function handle(LaravelJobFailed $event)
37+
{
38+
if (! $event->job instanceof RabbitMQJob) {
39+
return;
40+
}
41+
42+
$this->events->dispatch((new HorizonJobFailed(
43+
$event->exception, $event->job, $event->job->getRawBody()
44+
))->connection($event->connectionName)->queue($event->job->getQueue()));
45+
}
46+
}

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Amqp\AmqpContext;
77
use InvalidArgumentException;
88
use Illuminate\Contracts\Queue\Queue;
9+
use Illuminate\Queue\Events\JobFailed;
910
use Interop\Amqp\AmqpConnectionFactory;
1011
use Enqueue\AmqpTools\DelayStrategyAware;
1112
use Illuminate\Contracts\Events\Dispatcher;
@@ -15,6 +16,7 @@
1516
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1617
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
1718
use Enqueue\AmqpLib\AmqpConnectionFactory as EnqueueAmqpConnectionFactory;
19+
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\Listeners\RabbitMQFailedEvent;
1820
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
1921

2022
class RabbitMQConnector implements ConnectorInterface
@@ -79,6 +81,8 @@ public function connect(array $config): Queue
7981
}
8082

8183
if ($worker === 'horizon') {
84+
$this->dispatcher->listen(JobFailed::class, RabbitMQFailedEvent::class);
85+
8286
return new HorizonRabbitMQQueue($context, $config);
8387
}
8488

0 commit comments

Comments
 (0)