Skip to content

Fixes #743 for version 3.x #786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions src/Jenssegers/Mongodb/Queue/MongoQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,75 @@

use Carbon\Carbon;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Jobs\DatabaseJob;
use MongoDB\Operation\FindOneAndUpdate;
use DB;

class MongoQueue extends DatabaseQueue
{
/**
* Get the next available job for the queue.
* Pop the next job off of the queue.
*
* @param string $queue
*
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);

if (!is_null($this->expire)) {
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
}

if ($job = $this->getNextAvailableJobAndReserve($queue)) {
return new DatabaseJob(
$this->container, $this, $job, $queue
);
}
}

/**
* Get the next available job for the queue and mark it as reserved.
*
* When using multiple daemon queue listeners to process jobs there
* is a possibility that multiple processes can end up reading the
* same record before one has flagged it as reserved.
*
* This race condition can result in random jobs being run more then
* once. To solve this we use findOneAndUpdate to lock the next jobs
* record while flagging it as reserved at the same time.
*
* @param string|null $queue
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
protected function getNextAvailableJobAndReserve($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('available_at', '<=', $this->getTime())
->orderBy('id', 'asc')
->first();
$job = DB::getCollection($this->table)->findOneAndUpdate(
[
'queue' => $this->getQueue($queue),
'reserved' => 0,
'available_at' => ['$lte' => $this->getTime()],

],
[
'$set' => [
'reserved' => 1,
'reserved_at' => $this->getTime(),
],
],
[
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
'sort' => ['available_at' => 1],
]
);

if ($job) {
$job = (object) $job;
$job->id = $job->_id;
}

return $job ?: null;
return $job;
}

/**
Expand All @@ -40,16 +84,16 @@ protected function releaseJobsThatHaveBeenReservedTooLong($queue)
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();

$reserved = $this->database->collection($this->table)
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)->get();
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)->get();

foreach ($reserved as $job) {
$attempts = $job['attempts'] + 1;
$this->releaseJob($job['_id'], $attempts);
}
}

/**
* Release the given job ID from reservation.
*
Expand All @@ -66,19 +110,6 @@ protected function releaseJob($id, $attempts)
]);
}

/**
* Mark the given job ID as reserved.
*
* @param string $id
* @return void
*/
protected function markJobAsReserved($id)
{
$this->database->collection($this->table)->where('_id', $id)->update([
'reserved' => 1, 'reserved_at' => $this->getTime(),
]);
}

/**
* Delete a reserved job from the queue.
*
Expand Down
38 changes: 36 additions & 2 deletions tests/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,46 @@

class QueueTest extends TestCase
{
public function testQueue()
public function setUp()
{
$id = Queue::push('test', ['foo' => 'bar'], 'test');
parent::setUp();

// Always start with a clean slate
Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->truncate();
Queue::getDatabase()->table(Config::get('queue.failed.table'))->truncate();
}

public function testQueueJobLifeCycle()
{
$id = Queue::push('test', ['action' => 'QueueJobLifeCycle'], 'test');
$this->assertNotNull($id);

// Get and reserve the test job (next available)
$job = Queue::pop('test');
$this->assertInstanceOf('Illuminate\Queue\Jobs\DatabaseJob', $job);
$this->assertEquals(1, $job->getDatabaseJob()->reserved);
$this->assertEquals(json_encode(['job' => 'test', 'data' => ['action' => 'QueueJobLifeCycle']]), $job->getRawBody());

// Remove reserved job
$job->delete();
$this->assertEquals(0, Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->count());
}

public function testQueueJobExpired()
{
$id = Queue::push('test', ['action' => 'QueueJobExpired'], 'test');
$this->assertNotNull($id);

// Expire the test job
$expiry = \Carbon\Carbon::now()->subSeconds(Config::get('queue.connections.database.expire'))->getTimestamp();
Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->where('_id', $id)->update(['reserved' => 1, 'reserved_at' => $expiry]);

// Expect an attempted older job in the queue
$job = Queue::pop('test');
$this->assertEquals(2, $job->getDatabaseJob()->attempts);
$this->assertGreaterThan($expiry, $job->getDatabaseJob()->reserved_at);

$job->delete();
$this->assertEquals(0, Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->count());
}
}
23 changes: 23 additions & 0 deletions tests/config/queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

return [

'default' => 'database',

'connections' => [

'database' => [
'driver' => 'mongodb',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
],

],

'failed' => [
'database' => 'mongodb',
'table' => 'failed_jobs',
],

];