Skip to content

Commit 2b797b9

Browse files
committed
yiisoft#269: Handling of broken messages that are not unserialized correctly
1 parent 62f6a69 commit 2b797b9

File tree

8 files changed

+133
-15
lines changed

8 files changed

+133
-15
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ Yii2 Queue Extension Change Log
55
-----------------------
66

77
- Enh #248: Reduce roundtrips to beanstalk server when removing job (SamMousa)
8-
- Fix #258: Worker in isolated mode fails if PHP_BINARY contains spaces (luke-)
8+
- Bug #258: Worker in isolated mode fails if PHP_BINARY contains spaces (luke-)
9+
- Bug #269: Handling of broken messages that are not unserialized correctly (zhuravljov)
910

1011
2.1.0 May 24, 2018
1112
------------------

src/InvalidJobException.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
/**
3+
* @link http://www.yiiframework.com/
4+
* @copyright Copyright (c) 2008 Yii Software LLC
5+
* @license http://www.yiiframework.com/license/
6+
*/
7+
8+
namespace yii\queue;
9+
10+
use Throwable;
11+
12+
/**
13+
* Invalid Job Exception
14+
*
15+
* Throws when serialized message cannot be unserialized to a job.
16+
*
17+
* @author Roman Zhuravlev <zhuravljov@gmail.com>
18+
* @since 2.1.1
19+
*/
20+
class InvalidJobException extends \Exception
21+
{
22+
/**
23+
* @var string
24+
*/
25+
private $serialized;
26+
27+
/**
28+
* @param string $serialized
29+
* @param string $message
30+
* @param int $code
31+
* @param Throwable|null $previous
32+
*/
33+
public function __construct($serialized, $message = '', $code = 0, Throwable $previous = null)
34+
{
35+
$this->serialized = $serialized;
36+
parent::__construct($message, $code, $previous);
37+
}
38+
39+
/**
40+
* @return string of serialized message that cannot be unserialized to a job
41+
*/
42+
final public function getSerialized()
43+
{
44+
return $this->serialized;
45+
}
46+
}

src/JobEvent.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class JobEvent extends Event
2626
*/
2727
public $id;
2828
/**
29-
* @var JobInterface
29+
* @var JobInterface|null
3030
*/
3131
public $job;
3232
/**

src/LogBehavior.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public function workerStop(cli\WorkerEvent $event)
123123
*/
124124
protected function getJobTitle(JobEvent $event)
125125
{
126-
$name = $event->job instanceof JobInterface ? get_class($event->job) : 'mixed data';
126+
$name = $event->job instanceof JobInterface ? get_class($event->job) : 'unknown job';
127127
return "[$event->id] $name";
128128
}
129129

src/Queue.php

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,7 @@ public function getWorkerPid()
193193
*/
194194
protected function handleMessage($id, $message, $ttr, $attempt)
195195
{
196-
$job = $this->serializer->unserialize($message);
197-
if (!($job instanceof JobInterface)) {
198-
$dump = VarDumper::dumpAsString($job);
199-
throw new InvalidArgumentException("Job $id must be a JobInterface instance instead of $dump.");
200-
}
196+
list($job, $error) = $this->unserializeMessage($message);
201197

202198
$event = new ExecEvent([
203199
'id' => $id,
@@ -210,6 +206,10 @@ protected function handleMessage($id, $message, $ttr, $attempt)
210206
return true;
211207
}
212208

209+
if ($error) {
210+
return $this->handleError($event->id, $event->job, $event->ttr, $event->attempt, $error);
211+
}
212+
213213
try {
214214
$event->job->execute($this);
215215
} catch (\Exception $error) {
@@ -222,6 +222,31 @@ protected function handleMessage($id, $message, $ttr, $attempt)
222222
return true;
223223
}
224224

225+
/**
226+
* Unserializes
227+
*
228+
* @param string $id of the job
229+
* @param string $serialized message
230+
* @return array pair of a job and error that
231+
*/
232+
public function unserializeMessage($serialized)
233+
{
234+
try {
235+
$job = $this->serializer->unserialize($serialized);
236+
} catch (\Exception $e) {
237+
return [null, new InvalidJobException($serialized, $e->getMessage(), 0, $e)];
238+
}
239+
240+
if ($job instanceof JobInterface) {
241+
return [$job, null];
242+
}
243+
244+
return [null, new InvalidJobException($serialized, sprintf(
245+
'Job must be a JobInterface instance instead of %s.',
246+
VarDumper::dumpAsString($job)
247+
))];
248+
}
249+
225250
/**
226251
* @param string|null $id
227252
* @param JobInterface $job
@@ -233,18 +258,21 @@ protected function handleMessage($id, $message, $ttr, $attempt)
233258
*/
234259
public function handleError($id, $job, $ttr, $attempt, $error)
235260
{
261+
$retry = $attempt < $this->attempts;
262+
if ($error instanceof InvalidJobException) {
263+
$retry = false;
264+
} elseif ($job instanceof RetryableJobInterface) {
265+
$retry = $job->canRetry($attempt, $error);
266+
}
236267
$event = new ErrorEvent([
237268
'id' => $id,
238269
'job' => $job,
239270
'ttr' => $ttr,
240271
'attempt' => $attempt,
241272
'error' => $error,
242-
'retry' => $job instanceof RetryableJobInterface
243-
? $job->canRetry($attempt, $error)
244-
: $attempt < $this->attempts,
273+
'retry' => $retry,
245274
]);
246275
$this->trigger(self::EVENT_AFTER_ERROR, $event);
247-
248276
return !$event->retry;
249277
}
250278

src/cli/Command.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ protected function handleMessage($id, $message, $ttr, $attempt)
195195
}
196196
return $result === self::EXEC_DONE;
197197
} catch (ProcessRuntimeException $error) {
198-
$job = $this->queue->serializer->unserialize($message);
198+
list($job) = $this->queue->unserializeMessage($message);
199199
return $this->queue->handleError($id, $job, $ttr, $attempt, $error);
200200
}
201201
}

src/cli/VerboseBehavior.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use yii\helpers\Console;
1313
use yii\queue\ErrorEvent;
1414
use yii\queue\ExecEvent;
15+
use yii\queue\JobInterface;
1516

1617
/**
1718
* Verbose Behavior.
@@ -107,12 +108,12 @@ public function afterError(ErrorEvent $event)
107108
*/
108109
protected function jobTitle(ExecEvent $event)
109110
{
110-
$class = get_class($event->job);
111+
$name = $event->job instanceof JobInterface ? get_class($event->job) : 'unknown job';
111112
$extra = "attempt: $event->attempt";
112113
if ($pid = $event->sender->getWorkerPid()) {
113114
$extra .= ", pid: $pid";
114115
}
115-
return " [$event->id] $class ($extra)";
116+
return " [$event->id] $name ($extra)";
116117
}
117118

118119
/**

tests/JobEventTest.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
/**
3+
* @link http://www.yiiframework.com/
4+
* @copyright Copyright (c) 2008 Yii Software LLC
5+
* @license http://www.yiiframework.com/license/
6+
*/
7+
8+
namespace tests;
9+
10+
use yii\queue\ErrorEvent;
11+
use yii\queue\InvalidJobException;
12+
use yii\queue\JobEvent;
13+
use yii\queue\Queue;
14+
use yii\queue\sync\Queue as SyncQueue;
15+
16+
/**
17+
* Job Event Test
18+
*
19+
* @author Roman Zhuravlev <zhuravljov@gmail.com>
20+
*/
21+
class JobEventTest extends TestCase
22+
{
23+
public function testInvalidJob()
24+
{
25+
$eventCounter = [];
26+
$eventHandler = function (JobEvent $event) use (&$eventCounter) {
27+
$eventCounter[$event->id][$event->name] = true;
28+
};
29+
$queue = new SyncQueue(['strictJobType' => false]);
30+
$queue->on(Queue::EVENT_BEFORE_EXEC, $eventHandler);
31+
$queue->on(Queue::EVENT_AFTER_ERROR, $eventHandler);
32+
$queue->on(Queue::EVENT_AFTER_ERROR, function (ErrorEvent $event) {
33+
$this->assertTrue($event->error instanceof InvalidJobException);
34+
$this->assertFalse($event->retry);
35+
});
36+
$jobId = $queue->push('message that cannot be unserialized');
37+
$queue->run();
38+
$this->assertArrayHasKey($jobId, $eventCounter);
39+
$this->assertArrayHasKey(Queue::EVENT_BEFORE_EXEC, $eventCounter[$jobId]);
40+
$this->assertArrayHasKey(Queue::EVENT_AFTER_ERROR, $eventCounter[$jobId]);
41+
}
42+
}

0 commit comments

Comments
 (0)