Skip to content

Commit 8eaec03

Browse files
vdauchytaylorotwell
authored andcommitted
Add JobQueued event
(Follows: #32894)
1 parent d7f27d5 commit 8eaec03

File tree

7 files changed

+191
-30
lines changed

7 files changed

+191
-30
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Events;
4+
5+
class JobQueued
6+
{
7+
/**
8+
* @var string|int|null
9+
*/
10+
public $jobId;
11+
12+
/**
13+
* @var string|object
14+
*/
15+
public $job;
16+
17+
/**
18+
* JobQueued constructor.
19+
*
20+
* @param string|int|null $jobId
21+
* @param \Closure|string|object $job
22+
* @return void
23+
*/
24+
public function __construct($jobId, $job)
25+
{
26+
$this->jobId = $jobId;
27+
$this->job = $job;
28+
}
29+
}

src/Illuminate/Queue/Queue.php

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Illuminate\Container\Container;
88
use Illuminate\Contracts\Encryption\Encrypter;
99
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
10+
use Illuminate\Queue\Events\JobQueued;
1011
use Illuminate\Support\Arr;
1112
use Illuminate\Support\InteractsWithTime;
1213
use Illuminate\Support\Str;
@@ -284,13 +285,17 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
284285
if ($this->shouldDispatchAfterCommit($job) &&
285286
$this->container->bound('db.transactions')) {
286287
return $this->container->make('db.transactions')->addCallback(
287-
function () use ($payload, $queue, $delay, $callback) {
288-
return $callback($payload, $queue, $delay);
288+
function () use ($payload, $queue, $delay, $callback, $job) {
289+
return tap($callback($payload, $queue, $delay), function ($jobId) use ($job) {
290+
$this->raiseJobQueuedEvent($jobId, $job);
291+
});
289292
}
290293
);
291294
}
292295

293-
return $callback($payload, $queue, $delay);
296+
return tap($callback($payload, $queue, $delay), function ($jobId) use ($job) {
297+
$this->raiseJobQueuedEvent($jobId, $job);
298+
});
294299
}
295300

296301
/**
@@ -345,4 +350,18 @@ public function setContainer(Container $container)
345350
{
346351
$this->container = $container;
347352
}
353+
354+
/**
355+
* Raise the job queued event.
356+
*
357+
* @param string|int|null $jobId
358+
* @param \Closure|string|object $job
359+
* @return void
360+
*/
361+
protected function raiseJobQueuedEvent($jobId, $job)
362+
{
363+
if ($this->container->bound('events')) {
364+
$this->container['events']->dispatch(new JobQueued($jobId, $job));
365+
}
366+
}
348367
}

tests/Queue/QueueBeanstalkdQueueTest.php

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@
1313

1414
class QueueBeanstalkdQueueTest extends TestCase
1515
{
16+
/**
17+
* @var BeanstalkdQueue
18+
*/
19+
private $queue;
20+
21+
/**
22+
* @var Container|m\LegacyMockInterface|m\MockInterface
23+
*/
24+
private $container;
25+
1626
protected function tearDown(): void
1727
{
1828
m::close();
@@ -26,14 +36,16 @@ public function testPushProperlyPushesJobOntoBeanstalkd()
2636
return $uuid;
2737
});
2838

29-
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
30-
$pheanstalk = $queue->getPheanstalk();
39+
$this->setQueue('default', 60);
40+
$pheanstalk = $this->queue->getPheanstalk();
3141
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
3242
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
3343
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data']]), 1024, 0, 60);
3444

35-
$queue->push('foo', ['data'], 'stack');
36-
$queue->push('foo', ['data']);
45+
$this->queue->push('foo', ['data'], 'stack');
46+
$this->queue->push('foo', ['data']);
47+
48+
$this->container->shouldHaveReceived('bound')->with('events')->times(2);
3749

3850
Str::createUuidsNormally();
3951
}
@@ -46,53 +58,68 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd()
4658
return $uuid;
4759
});
4860

49-
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
50-
$pheanstalk = $queue->getPheanstalk();
61+
$this->setQueue('default', 60);
62+
$pheanstalk = $this->queue->getPheanstalk();
5163
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
5264
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
5365
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data']]), Pheanstalk::DEFAULT_PRIORITY, 5, Pheanstalk::DEFAULT_TTR);
5466

55-
$queue->later(5, 'foo', ['data'], 'stack');
56-
$queue->later(5, 'foo', ['data']);
67+
$this->queue->later(5, 'foo', ['data'], 'stack');
68+
$this->queue->later(5, 'foo', ['data']);
69+
70+
$this->container->shouldHaveReceived('bound')->with('events')->times(2);
5771

5872
Str::createUuidsNormally();
5973
}
6074

6175
public function testPopProperlyPopsJobOffOfBeanstalkd()
6276
{
63-
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
64-
$queue->setContainer(m::mock(Container::class));
65-
$pheanstalk = $queue->getPheanstalk();
77+
$this->setQueue('default', 60);
78+
79+
$pheanstalk = $this->queue->getPheanstalk();
6680
$pheanstalk->shouldReceive('watchOnly')->once()->with('default')->andReturn($pheanstalk);
6781
$job = m::mock(Job::class);
6882
$pheanstalk->shouldReceive('reserveWithTimeout')->once()->with(0)->andReturn($job);
6983

70-
$result = $queue->pop();
84+
$result = $this->queue->pop();
7185

7286
$this->assertInstanceOf(BeanstalkdJob::class, $result);
7387
}
7488

7589
public function testBlockingPopProperlyPopsJobOffOfBeanstalkd()
7690
{
77-
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60, 60);
78-
$queue->setContainer(m::mock(Container::class));
79-
$pheanstalk = $queue->getPheanstalk();
91+
$this->setQueue('default', 60, 60);
92+
93+
$pheanstalk = $this->queue->getPheanstalk();
8094
$pheanstalk->shouldReceive('watchOnly')->once()->with('default')->andReturn($pheanstalk);
8195
$job = m::mock(Job::class);
8296
$pheanstalk->shouldReceive('reserveWithTimeout')->once()->with(60)->andReturn($job);
8397

84-
$result = $queue->pop();
98+
$result = $this->queue->pop();
8599

86100
$this->assertInstanceOf(BeanstalkdJob::class, $result);
87101
}
88102

89103
public function testDeleteProperlyRemoveJobsOffBeanstalkd()
90104
{
91-
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
92-
$pheanstalk = $queue->getPheanstalk();
105+
$this->setQueue('default', 60);
106+
107+
$pheanstalk = $this->queue->getPheanstalk();
93108
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
94109
$pheanstalk->shouldReceive('delete')->once()->with(m::type(Job::class));
95110

96-
$queue->deleteMessage('default', 1);
111+
$this->queue->deleteMessage('default', 1);
112+
}
113+
114+
/**
115+
* @param string $default
116+
* @param int $timeToRun
117+
* @param int $blockFor
118+
*/
119+
private function setQueue($default, $timeToRun, $blockFor = 0)
120+
{
121+
$this->queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), $default, $timeToRun, $blockFor);
122+
$this->container = m::spy(Container::class);
123+
$this->queue->setContainer($this->container);
97124
}
98125
}

tests/Queue/QueueDatabaseQueueUnitTest.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Illuminate\Tests\Queue;
44

5+
use Illuminate\Container\Container;
56
use Illuminate\Database\Connection;
67
use Illuminate\Queue\DatabaseQueue;
78
use Illuminate\Queue\Queue;
@@ -28,6 +29,7 @@ public function testPushProperlyPushesJobOntoDatabase()
2829

2930
$queue = $this->getMockBuilder(DatabaseQueue::class)->onlyMethods(['currentTime'])->setConstructorArgs([$database = m::mock(Connection::class), 'table', 'default'])->getMock();
3031
$queue->expects($this->any())->method('currentTime')->willReturn('time');
32+
$queue->setContainer($container = m::spy(Container::class));
3133
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock(stdClass::class));
3234
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) use ($uuid) {
3335
$this->assertSame('default', $array['queue']);
@@ -39,6 +41,8 @@ public function testPushProperlyPushesJobOntoDatabase()
3941

4042
$queue->push('foo', ['data']);
4143

44+
$container->shouldHaveReceived('bound')->with('events')->once();
45+
4246
Str::createUuidsNormally();
4347
}
4448

@@ -56,6 +60,7 @@ public function testDelayedPushProperlyPushesJobOntoDatabase()
5660
[$database = m::mock(Connection::class), 'table', 'default']
5761
)->getMock();
5862
$queue->expects($this->any())->method('currentTime')->willReturn('time');
63+
$queue->setContainer($container = m::spy(Container::class));
5964
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock(stdClass::class));
6065
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) use ($uuid) {
6166
$this->assertSame('default', $array['queue']);
@@ -67,6 +72,8 @@ public function testDelayedPushProperlyPushesJobOntoDatabase()
6772

6873
$queue->later(10, 'foo', ['data']);
6974

75+
$container->shouldHaveReceived('bound')->with('events')->once();
76+
7077
Str::createUuidsNormally();
7178
}
7279

tests/Queue/QueueRedisQueueTest.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Illuminate\Tests\Queue;
44

5+
use Illuminate\Container\Container;
56
use Illuminate\Contracts\Redis\Factory;
67
use Illuminate\Queue\LuaScripts;
78
use Illuminate\Queue\Queue;
@@ -28,11 +29,13 @@ public function testPushProperlyPushesJobOntoRedis()
2829

2930
$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
3031
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
32+
$queue->setContainer($container = m::spy(Container::class));
3133
$redis->shouldReceive('connection')->once()->andReturn($redis);
3234
$redis->shouldReceive('eval')->once()->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
3335

3436
$id = $queue->push('foo', ['data']);
3537
$this->assertSame('foo', $id);
38+
$container->shouldHaveReceived('bound')->with('events')->once();
3639

3740
Str::createUuidsNormally();
3841
}
@@ -47,6 +50,7 @@ public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()
4750

4851
$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
4952
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
53+
$queue->setContainer($container = m::spy(Container::class));
5054
$redis->shouldReceive('connection')->once()->andReturn($redis);
5155
$redis->shouldReceive('eval')->once()->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'id' => 'foo', 'attempts' => 0]));
5256

@@ -56,6 +60,7 @@ public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()
5660

5761
$id = $queue->push('foo', ['data']);
5862
$this->assertSame('foo', $id);
63+
$container->shouldHaveReceived('bound')->with('events')->once();
5964

6065
Queue::createPayloadUsing(null);
6166

@@ -72,6 +77,7 @@ public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()
7277

7378
$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
7479
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
80+
$queue->setContainer($container = m::spy(Container::class));
7581
$redis->shouldReceive('connection')->once()->andReturn($redis);
7682
$redis->shouldReceive('eval')->once()->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'bar' => 'foo', 'id' => 'foo', 'attempts' => 0]));
7783

@@ -85,6 +91,7 @@ public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()
8591

8692
$id = $queue->push('foo', ['data']);
8793
$this->assertSame('foo', $id);
94+
$container->shouldHaveReceived('bound')->with('events')->once();
8895

8996
Queue::createPayloadUsing(null);
9097

@@ -100,6 +107,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
100107
});
101108

102109
$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
110+
$queue->setContainer($container = m::spy(Container::class));
103111
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
104112
$queue->expects($this->once())->method('availableAt')->with(1)->willReturn(2);
105113

@@ -112,6 +120,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
112120

113121
$id = $queue->later(1, 'foo', ['data']);
114122
$this->assertSame('foo', $id);
123+
$container->shouldHaveReceived('bound')->with('events')->once();
115124

116125
Str::createUuidsNormally();
117126
}
@@ -126,6 +135,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
126135

127136
$date = Carbon::now();
128137
$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
138+
$queue->setContainer($container = m::spy(Container::class));
129139
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
130140
$queue->expects($this->once())->method('availableAt')->with($date)->willReturn(2);
131141

@@ -137,6 +147,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
137147
);
138148

139149
$queue->later($date, 'foo', ['data']);
150+
$container->shouldHaveReceived('bound')->with('events')->once();
140151

141152
Str::createUuidsNormally();
142153
}

tests/Queue/QueueSqsQueueTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,33 +92,39 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs()
9292
{
9393
$now = Carbon::now();
9494
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
95+
$queue->setContainer($container = m::spy(Container::class));
9596
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
9697
$queue->expects($this->once())->method('secondsUntil')->with($now)->willReturn(5);
9798
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
9899
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel);
99100
$id = $queue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, $this->queueName);
100101
$this->assertEquals($this->mockedMessageId, $id);
102+
$container->shouldHaveReceived('bound')->with('events')->once();
101103
}
102104

103105
public function testDelayedPushProperlyPushesJobOntoSqs()
104106
{
105107
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
108+
$queue->setContainer($container = m::spy(Container::class));
106109
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
107110
$queue->expects($this->once())->method('secondsUntil')->with($this->mockedDelay)->willReturn($this->mockedDelay);
108111
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
109112
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel);
110113
$id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->queueName);
111114
$this->assertEquals($this->mockedMessageId, $id);
115+
$container->shouldHaveReceived('bound')->with('events')->once();
112116
}
113117

114118
public function testPushProperlyPushesJobOntoSqs()
115119
{
116120
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
121+
$queue->setContainer($container = m::spy(Container::class));
117122
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
118123
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
119124
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
120125
$id = $queue->push($this->mockedJob, $this->mockedData, $this->queueName);
121126
$this->assertEquals($this->mockedMessageId, $id);
127+
$container->shouldHaveReceived('bound')->with('events')->once();
122128
}
123129

124130
public function testSizeProperlyReadsSqsQueueSize()

0 commit comments

Comments
 (0)