Skip to content

Commit

Permalink
Refactor database and beanstalk queues.
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 29, 2016
1 parent 101791e commit a041fb5
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 129 deletions.
13 changes: 13 additions & 0 deletions src/Illuminate/Queue/CalculatesDelays.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ protected function secondsUntil($delay)
: (int) $delay;
}

/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTimeInterface|int $delay
* @return int
*/
protected function availableAt($delay = 0)
{
return $delay instanceof DateTimeInterface
? $delay->getTimestamp()
: Carbon::now()->addSeconds($delay)->getTimestamp();
}

/**
* Get the current system time as a UNIX timestamp.
*
Expand Down
145 changes: 69 additions & 76 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Carbon\Carbon;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Contracts\Queue\Queue as QueueContract;

class DatabaseQueue extends Queue implements QueueContract
Expand Down Expand Up @@ -36,23 +37,23 @@ class DatabaseQueue extends Queue implements QueueContract
*
* @var int|null
*/
protected $expire = 60;
protected $retryAfter = 60;

/**
* Create a new database queue instance.
*
* @param \Illuminate\Database\Connection $database
* @param string $table
* @param string $default
* @param int $expire
* @param int $retryAfter
* @return void
*/
public function __construct(Connection $database, $table, $default = 'default', $expire = 60)
public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60)
{
$this->table = $table;
$this->expire = $expire;
$this->default = $default;
$this->database = $database;
$this->retryAfter = $retryAfter;
}

/**
Expand All @@ -78,7 +79,7 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
return $this->pushToDatabase($queue, $this->createPayload($job, $data));
}

/**
Expand All @@ -91,7 +92,7 @@ public function push($job, $data = '', $queue = null)
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pushToDatabase(0, $queue, $payload);
return $this->pushToDatabase($queue, $payload);
}

/**
Expand All @@ -105,7 +106,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay);
}

/**
Expand All @@ -120,46 +121,63 @@ public function bulk($jobs, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);

$availableAt = $this->getAvailableAt(0);
$availableAt = $this->availableAt();

$records = array_map(function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord(
$queue, $this->createPayload($job, $data), $availableAt
);
}, (array) $jobs);

return $this->database->table($this->table)->insert($records);
return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
}
)->all());
}

/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param \StdClass $job
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);
}

/**
* Push a raw payload to the database with a given delay.
*
* @param \DateTime|int $delay
* @param string|null $queue
* @param string $payload
* @param \DateTime|int $delay
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
{
$attributes = $this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
);
return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
));
}

return $this->database->table($this->table)->insertGetId($attributes);
/**
* Create an array to insert for the given job.
*
* @param string|null $queue
* @param string $payload
* @param int $availableAt
* @param int $attempts
* @return array
*/
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'payload' => $payload,
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->currentTime(),
];
}

/**
Expand All @@ -175,13 +193,7 @@ public function pop($queue = null)
$this->database->beginTransaction();

if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);

$this->database->commit();

return new DatabaseJob(
$this->container, $this, $job, $queue
);
return $this->marshalJob($queue, $job);
}

$this->database->commit();
Expand All @@ -191,7 +203,7 @@ public function pop($queue = null)
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
*/
protected function getNextAvailableJob($queue)
{
Expand All @@ -205,7 +217,7 @@ protected function getNextAvailableJob($queue)
->orderBy('id', 'asc')
->first();

return $job ? (object) $job : null;
return $job ? new DatabaseJobRecord((object) $job) : null;
}

/**
Expand All @@ -217,8 +229,8 @@ protected function getNextAvailableJob($queue)
protected function isAvailable($query)
{
$query->where(function ($query) {
$query->whereNull('reserved_at');
$query->where('available_at', '<=', $this->currentTime());
$query->whereNull('reserved_at')
->where('available_at', '<=', $this->currentTime());
});
}

Expand All @@ -230,27 +242,42 @@ protected function isAvailable($query)
*/
protected function isReservedButExpired($query)
{
$expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();

$query->orWhere(function ($query) use ($expiration) {
$query->where('reserved_at', '<=', $expiration);
});
}

/**
* Marshal the reserved job into a DatabaseJob instance.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @return \Illuminate\Queue\Jobs\DatabaseJob
*/
protected function marshalJob($queue, $job)
{
$job = $this->markJobAsReserved($job);

$this->database->commit();

return new DatabaseJob(
$this->container, $this, $job, $queue
);
}

/**
* Mark the given job ID as reserved.
*
* @param \stdClass $job
* @return \stdClass
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord
*/
protected function markJobAsReserved($job)
{
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->currentTime();

$this->database->table($this->table)->where('id', $job->id)->update([
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
'reserved_at' => $job->touch(),
'attempts' => $job->increment(),
]);

return $job;
Expand All @@ -274,40 +301,6 @@ public function deleteReserved($queue, $id)
$this->database->commit();
}

/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getAvailableAt($delay)
{
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);

return $availableAt->getTimestamp();
}

/**
* Create an array to insert for the given job.
*
* @param string|null $queue
* @param string $payload
* @param int $availableAt
* @param int $attempts
* @return array
*/
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->currentTime(),
'payload' => $payload,
];
}

/**
* Get the queue or return the default.
*
Expand Down
20 changes: 10 additions & 10 deletions src/Illuminate/Queue/Jobs/BeanstalkdJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ public function __construct(Container $container, Pheanstalk $pheanstalk, Pheans
$this->pheanstalk = $pheanstalk;
}

/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->getData();
}

/**
* Release the job back into the queue.
*
Expand Down Expand Up @@ -111,6 +101,16 @@ public function getJobId()
return $this->job->getId();
}

/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->getData();
}

/**
* Get the underlying Pheanstalk instance.
*
Expand Down
Loading

0 comments on commit a041fb5

Please sign in to comment.