diff --git a/src/Listener.php b/src/Listener.php index 003f280..327671a 100644 --- a/src/Listener.php +++ b/src/Listener.php @@ -42,12 +42,12 @@ class Listener extends Consumer * @param string $queue * @param WorkerOptions $options * - * @return void + * @return int * * @psalm-suppress ImplementedReturnTypeMismatch * @throws Throwable */ - public function daemon($connectionName, $queue, WorkerOptions $options): void + public function daemon($connectionName, $queue, WorkerOptions $options) { if ($this->supportsAsyncSignals()) { $this->listenForSignals(); @@ -55,6 +55,8 @@ public function daemon($connectionName, $queue, WorkerOptions $options): void $lastRestart = $this->getTimestampOfLastQueueRestart(); + [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + /** @var RabbitMQQueue $connection */ $connection = $this->manager->connection($connectionName); @@ -163,35 +165,19 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - /** @psalm-suppress PossiblyNullArgument */ - $this->stopIfNecessary($options, $lastRestart); + $status = $this->stopIfNecessary( + $options, + $lastRestart, + $startTime, + $jobsProcessed, + $this->currentJob + ); - $this->currentJob = null; - } - } + if (! is_null($status)) { + return $this->stop($status, $options); + } - /** - * Stop the process if necessary. - * - * @param \Illuminate\Queue\WorkerOptions $options - * @param int $lastRestart - * @param int $startTime - * @param int $jobsProcessed - * @param mixed $job - * - * @return void - */ - protected function stopIfNecessary( - WorkerOptions $options, - $lastRestart, - $startTime = 0, - $jobsProcessed = 0, - $job = null - ) { - if ($this->shouldQuit) { - $this->stop(); - } elseif ($options->stopWhenEmpty && is_null($job)) { - $this->stop(); + $this->currentJob = null; } } diff --git a/src/Queue/CallQueuedHandler.php b/src/Queue/CallQueuedHandler.php index 4f850a2..25bcccc 100644 --- a/src/Queue/CallQueuedHandler.php +++ b/src/Queue/CallQueuedHandler.php @@ -32,25 +32,22 @@ class CallQueuedHandler */ protected Container $container; - /** - * @var ExceptionHandler - */ - protected ExceptionHandler $exceptionHandler; - /** * Create a new handler instance. * * @param Dispatcher $dispatcher * @param Container $container - * @param ExceptionHandler $exceptionHandler */ - public function __construct(Dispatcher $dispatcher, Container $container, ExceptionHandler $exceptionHandler) + public function __construct(Dispatcher $dispatcher, Container $container) { $this->container = $container; $this->dispatcher = $dispatcher; - $this->exceptionHandler = $exceptionHandler; } + /** + * @throws BindingResolutionException + * @throws Throwable + */ public function call(Job $job, string $listener, array $data): void { try { @@ -64,12 +61,11 @@ public function call(Job $job, string $listener, array $data): void try { $this->dispatchThroughMiddleware($job, $listener, $data); } catch (Throwable $e) { - $this->exceptionHandler->report($e); - - if (method_exists($listener, 'failed')) { - $listener->failed($data, $e); + if (!method_exists($listener, 'failed')) { + throw $e; } - $job->fail($e); + + $listener->failed($data, $e); } if (!$job->hasFailed() && !$job->isReleased()) { diff --git a/src/Queue/Factory/RabbitMQFactory.php b/src/Queue/Factory/RabbitMQFactory.php index 6d9e132..299ba3e 100644 --- a/src/Queue/Factory/RabbitMQFactory.php +++ b/src/Queue/Factory/RabbitMQFactory.php @@ -48,8 +48,7 @@ public static function make( new EventRouter(), new CallQueuedHandler( $container->make(Dispatcher::class), - $container, - $container->app->make(ExceptionHandler::class), + $container ) ); } diff --git a/tests/RabbitMQListenerTest.php b/tests/RabbitMQListenerTest.php index a0aa7d0..e7b251e 100644 --- a/tests/RabbitMQListenerTest.php +++ b/tests/RabbitMQListenerTest.php @@ -63,8 +63,7 @@ public function testItFire(): void new EventRouter(), new CallQueuedHandler( $this->app->make(Dispatcher::class), - $this->app, - $this->app->make(ExceptionHandler::class), + $this->app ) ); @@ -89,8 +88,7 @@ public function testGetNameOldFormat(): void new EventRouter(), new CallQueuedHandler( $this->app->make(Dispatcher::class), - $this->app, - $this->app->make(ExceptionHandler::class), + $this->app ) ); @@ -122,8 +120,7 @@ public function testGetNameAndId(): void new EventRouter(), new CallQueuedHandler( $this->app->make(Dispatcher::class), - $this->app, - $this->app->make(ExceptionHandler::class), + $this->app ) );