Skip to content

Commit

Permalink
Merge pull request #45 from chocofamilyme/restart-worker
Browse files Browse the repository at this point in the history
Сделал остановку слушателя, если нужно остановить его
  • Loading branch information
Vadim89 authored Sep 6, 2024
2 parents 4daa0b3 + 5420817 commit e2d29bd
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 50 deletions.
44 changes: 15 additions & 29 deletions src/Listener.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,21 @@ class Listener extends Consumer
* @param string $queue
* @param WorkerOptions $options
*
* @return void
* @return int
*
* @psalm-suppress ImplementedReturnTypeMismatch
* @throws Throwable
*/
public function daemon($connectionName, $queue, WorkerOptions $options): void
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}

$lastRestart = $this->getTimestampOfLastQueueRestart();

[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];

/** @var RabbitMQQueue $connection */
$connection = $this->manager->connection($connectionName);

Expand Down Expand Up @@ -163,35 +165,19 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
/** @psalm-suppress PossiblyNullArgument */
$this->stopIfNecessary($options, $lastRestart);
$status = $this->stopIfNecessary(
$options,
$lastRestart,
$startTime,
$jobsProcessed,
$this->currentJob
);

$this->currentJob = null;
}
}
if (! is_null($status)) {
return $this->stop($status, $options);
}

/**
* Stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @param int $startTime
* @param int $jobsProcessed
* @param mixed $job
*
* @return void
*/
protected function stopIfNecessary(
WorkerOptions $options,
$lastRestart,
$startTime = 0,
$jobsProcessed = 0,
$job = null
) {
if ($this->shouldQuit) {
$this->stop();
} elseif ($options->stopWhenEmpty && is_null($job)) {
$this->stop();
$this->currentJob = null;
}
}

Expand Down
22 changes: 9 additions & 13 deletions src/Queue/CallQueuedHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,22 @@ class CallQueuedHandler
*/
protected Container $container;

/**
* @var ExceptionHandler
*/
protected ExceptionHandler $exceptionHandler;

/**
* Create a new handler instance.
*
* @param Dispatcher $dispatcher
* @param Container $container
* @param ExceptionHandler $exceptionHandler
*/
public function __construct(Dispatcher $dispatcher, Container $container, ExceptionHandler $exceptionHandler)
public function __construct(Dispatcher $dispatcher, Container $container)
{
$this->container = $container;
$this->dispatcher = $dispatcher;
$this->exceptionHandler = $exceptionHandler;
}

/**
* @throws BindingResolutionException
* @throws Throwable
*/
public function call(Job $job, string $listener, array $data): void
{
try {
Expand All @@ -64,12 +61,11 @@ public function call(Job $job, string $listener, array $data): void
try {
$this->dispatchThroughMiddleware($job, $listener, $data);
} catch (Throwable $e) {
$this->exceptionHandler->report($e);

if (method_exists($listener, 'failed')) {
$listener->failed($data, $e);
if (!method_exists($listener, 'failed')) {
throw $e;
}
$job->fail($e);

$listener->failed($data, $e);
}

if (!$job->hasFailed() && !$job->isReleased()) {
Expand Down
3 changes: 1 addition & 2 deletions src/Queue/Factory/RabbitMQFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public static function make(
new EventRouter(),
new CallQueuedHandler(
$container->make(Dispatcher::class),
$container,
$container->app->make(ExceptionHandler::class),
$container
)
);
}
Expand Down
9 changes: 3 additions & 6 deletions tests/RabbitMQListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public function testItFire(): void
new EventRouter(),
new CallQueuedHandler(
$this->app->make(Dispatcher::class),
$this->app,
$this->app->make(ExceptionHandler::class),
$this->app
)
);

Expand All @@ -89,8 +88,7 @@ public function testGetNameOldFormat(): void
new EventRouter(),
new CallQueuedHandler(
$this->app->make(Dispatcher::class),
$this->app,
$this->app->make(ExceptionHandler::class),
$this->app
)
);

Expand Down Expand Up @@ -122,8 +120,7 @@ public function testGetNameAndId(): void
new EventRouter(),
new CallQueuedHandler(
$this->app->make(Dispatcher::class),
$this->app,
$this->app->make(ExceptionHandler::class),
$this->app
)
);

Expand Down

0 comments on commit e2d29bd

Please sign in to comment.