Skip to content

Add support for 'Max retry duration' and allow for unlimited tasks #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions src/CloudTasksJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Stackkit\LaravelGoogleCloudTasksQueue;

use Google\Cloud\Tasks\V2\CloudTasksClient;
use Illuminate\Container\Container;
use Illuminate\Queue\Jobs\Job as LaravelJob;
use Illuminate\Contracts\Queue\Job as JobContract;
Expand All @@ -11,11 +12,18 @@ class CloudTasksJob extends LaravelJob implements JobContract
private $job;
private $attempts;
private $maxTries;
public $retryUntil = null;

public function __construct($job)
/**
* @var CloudTasksQueue
*/
private $cloudTasksQueue;

public function __construct($job, CloudTasksQueue $cloudTasksQueue)
{
$this->job = $job;
$this->container = Container::getInstance();
$this->cloudTasksQueue = $cloudTasksQueue;
}

public function getJobId()
Expand All @@ -41,7 +49,7 @@ public function setAttempts($attempts)
public function setMaxTries($maxTries)
{
if ((int) $maxTries === -1) {
$maxTries = null;
$maxTries = 0;
}

$this->maxTries = $maxTries;
Expand All @@ -56,4 +64,27 @@ public function setQueue($queue)
{
$this->queue = $queue;
}

public function setRetryUntil($retryUntil)
{
$this->retryUntil = $retryUntil;
}

public function retryUntil()
{
return $this->retryUntil;
}

// timeoutAt was renamed to retryUntil in 8.x but we still support this.
public function timeoutAt()
{
return $this->retryUntil;
}

public function delete()
{
parent::delete();

$this->cloudTasksQueue->delete($this);
}
}
14 changes: 14 additions & 0 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ private function createHttpRequest()
return app(HttpRequest::class);
}

public function delete(CloudTasksJob $job)
{
$config = $this->config;

$taskName = $this->client->taskName(
$config['project'],
$config['location'],
$job->getQueue(),
request()->header('X-Cloudtasks-Taskname')
);

$this->client->deleteTask($taskName);
}

/**
* @return Task
*/
Expand Down
69 changes: 61 additions & 8 deletions src/TaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace Stackkit\LaravelGoogleCloudTasksQueue;

use Google\Cloud\Tasks\V2\Attempt;
use Google\Cloud\Tasks\V2\CloudTasksClient;
use Google\Cloud\Tasks\V2\RetryConfig;
use Illuminate\Http\Request;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Worker;
Expand All @@ -14,6 +16,16 @@ class TaskHandler
private $publicKey;
private $config;

/**
* @var CloudTasksQueue
*/
private $queue;

/**
* @var RetryConfig
*/
private $retryConfig = null;

public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey)
{
$this->client = $client;
Expand All @@ -31,6 +43,8 @@ public function handle($task = null)

$this->loadQueueConnectionConfiguration($task);

$this->setQueue();

$this->authorizeRequest();

$this->listenForEvents();
Expand All @@ -48,6 +62,11 @@ private function loadQueueConnectionConfiguration($task)
);
}

private function setQueue()
{
$this->queue = new CloudTasksQueue($this->config, $this->client);
}

/**
* @throws CloudTasksException
*/
Expand Down Expand Up @@ -122,29 +141,63 @@ private function listenForEvents()
*/
private function handleTask($task)
{
$job = new CloudTasksJob($task);
$job = new CloudTasksJob($task, $this->queue);

$this->loadQueueRetryConfig();

$job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1);
$job->setQueue(request()->header('X-Cloudtasks-Queuename'));
$job->setMaxTries($this->getQueueMaxTries($job));
$job->setMaxTries($this->retryConfig->getMaxAttempts());

// If the job is being attempted again we also check if a
// max retry duration has been set. If that duration
// has passed, it should stop trying altogether.
if ($job->attempts() > 1) {
$job->setRetryUntil($this->getRetryUntilTimestamp($job));
}

$worker = $this->getQueueWorker();

$worker->process($this->config['connection'], $job, new WorkerOptions());
}

private function getQueueMaxTries(CloudTasksJob $job)
private function loadQueueRetryConfig()
{
$queueName = $this->client->queueName(
$this->config['project'],
$this->config['location'],
$job->getQueue()
request()->header('X-Cloudtasks-Queuename')
);

return $this->client
->getQueue($queueName)
->getRetryConfig()
->getMaxAttempts();
$this->retryConfig = $this->client->getQueue($queueName)->getRetryConfig();
}

private function getRetryUntilTimestamp(CloudTasksJob $job)
{
$task = $this->client->getTask(
$this->client->taskName(
$this->config['project'],
$this->config['location'],
$job->getQueue(),
request()->header('X-Cloudtasks-Taskname')
)
);

$attempt = $task->getFirstAttempt();

if (!$attempt instanceof Attempt) {
return null;
}

if (! $this->retryConfig->hasMaxRetryDuration()) {
return null;
}

$maxDurationInSeconds = $this->retryConfig->getMaxRetryDuration()->getSeconds();

$firstAttemptTimestamp = $attempt->getDispatchTime()->toDateTime()->getTimestamp();

return $firstAttemptTimestamp + $maxDurationInSeconds;
}

/**
Expand Down
Loading