Skip to content

Commit 80ecefb

Browse files
committed
Merge branch 'frans-beech-it-stop-worker-when-necessary'
2 parents 43456f9 + ecd02f2 commit 80ecefb

File tree

3 files changed

+39
-21
lines changed

3 files changed

+39
-21
lines changed

src/Consumer.php

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@
44

55
use Exception;
66
use Illuminate\Container\Container;
7-
use Illuminate\Contracts\Queue\Job;
87
use Illuminate\Queue\Worker;
98
use Illuminate\Queue\WorkerOptions;
109
use PhpAmqpLib\Channel\AMQPChannel;
1110
use PhpAmqpLib\Exception\AMQPRuntimeException;
1211
use PhpAmqpLib\Message\AMQPMessage;
13-
use Symfony\Component\Debug\Exception\FatalThrowableError;
1412
use Throwable;
1513
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
1614

@@ -31,8 +29,8 @@ class Consumer extends Worker
3129
/** @var AMQPChannel */
3230
protected $channel;
3331

34-
/** @var bool */
35-
protected $gotJob = false;
32+
/** @var object|null */
33+
protected $currentJob;
3634

3735
public function setContainer(Container $value): void
3836
{
@@ -54,14 +52,25 @@ public function setPrefetchCount(int $value): void
5452
$this->prefetchCount = $value;
5553
}
5654

57-
public function daemon($connectionName, $queue, WorkerOptions $options): void
55+
/**
56+
* Listen to the given queue in a loop.
57+
*
58+
* @param string $connectionName
59+
* @param string $queue
60+
* @param WorkerOptions $options
61+
* @return int
62+
* @throws Throwable
63+
*/
64+
public function daemon($connectionName, $queue, WorkerOptions $options)
5865
{
5966
if ($this->supportsAsyncSignals()) {
6067
$this->listenForSignals();
6168
}
6269

6370
$lastRestart = $this->getTimestampOfLastQueueRestart();
6471

72+
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
73+
6574
/** @var RabbitMQQueue $connection */
6675
$connection = $this->manager->connection($connectionName);
6776

@@ -82,9 +91,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options): void
8291
false,
8392
false,
8493
false,
85-
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass): void {
86-
$this->gotJob = true;
87-
94+
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void {
8895
$job = new $jobClass(
8996
$this->container,
9097
$connection,
@@ -93,10 +100,14 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
93100
$queue
94101
);
95102

103+
$this->currentJob = $job;
104+
96105
if ($this->supportsAsyncSignals()) {
97106
$this->registerTimeoutHandler($job, $options);
98107
}
99108

109+
$jobsProcessed++;
110+
100111
$this->runJob($job, $connectionName, $options);
101112

102113
if ($this->supportsAsyncSignals()) {
@@ -121,27 +132,33 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
121132
$this->exceptions->report($exception);
122133

123134
$this->kill(1);
124-
} catch (Exception $exception) {
135+
} catch (Exception | Throwable $exception) {
125136
$this->exceptions->report($exception);
126137

127-
$this->stopWorkerIfLostConnection($exception);
128-
} catch (Throwable $exception) {
129-
$this->exceptions->report($exception = new FatalThrowableError($exception));
130-
131138
$this->stopWorkerIfLostConnection($exception);
132139
}
133140

134141
// If no job is got off the queue, we will need to sleep the worker.
135-
if (! $this->gotJob) {
142+
if ($this->currentJob === null) {
136143
$this->sleep($options->sleep);
137144
}
138145

139146
// Finally, we will check to see if we have exceeded our memory limits or if
140147
// the queue should restart based on other indications. If so, we'll stop
141148
// this worker and let whatever is "monitoring" it restart the process.
142-
$this->stopIfNecessary($options, $lastRestart, $this->gotJob ? true : null);
149+
$status = $this->stopIfNecessary(
150+
$options,
151+
$lastRestart,
152+
$startTime,
153+
$jobsProcessed,
154+
$this->currentJob
155+
);
156+
157+
if (! is_null($status)) {
158+
return $this->stop($status);
159+
}
143160

144-
$this->gotJob = false;
161+
$this->currentJob = null;
145162
}
146163
}
147164

@@ -162,14 +179,14 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que
162179
* Stop listening and bail out of the script.
163180
*
164181
* @param int $status
165-
* @return void
182+
* @return int
166183
*/
167-
public function stop($status = 0): void
184+
public function stop($status = 0): int
168185
{
169186
// Tell the server you are going to stop consuming.
170187
// It will finish up the last message and not send you any more.
171188
$this->channel->basic_cancel($this->consumerTag, false, true);
172189

173-
parent::stop($status);
190+
return parent::stop($status);
174191
}
175192
}

src/Queue/RabbitMQQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public function getChannel(): AMQPChannel
279279
* Job class to use.
280280
*
281281
* @return string
282-
* @throws \Throwable
282+
* @throws Throwable
283283
*/
284284
public function getJobClass(): string
285285
{

tests/Feature/TestCase.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Illuminate\Support\Facades\Queue;
66
use Illuminate\Support\Str;
77
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
8+
use RuntimeException;
89
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
910
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob;
1011
use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase;
@@ -310,7 +311,7 @@ public function testFailed(): void
310311

311312
$job = Queue::pop();
312313

313-
$job->fail(new \RuntimeException($job->resolveName().' has an exception.'));
314+
$job->fail(new RuntimeException($job->resolveName().' has an exception.'));
314315

315316
sleep(1);
316317

0 commit comments

Comments
 (0)