Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/CloudTasksApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* @method static void deleteTask(string $taskName)
* @method static Task getTask(string $taskName)
* @method static bool exists(string $taskName)
* @method static void pause(string $queue)
* @method static void resume(string $queue)
*/
class CloudTasksApi extends Facade
{
Expand Down
12 changes: 12 additions & 0 deletions src/CloudTasksApiConcrete.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Google\Cloud\Tasks\V2\GetTaskRequest;
use Google\Cloud\Tasks\V2\CreateTaskRequest;
use Google\Cloud\Tasks\V2\DeleteTaskRequest;
use Google\Cloud\Tasks\V2\PauseQueueRequest;
use Google\Cloud\Tasks\V2\ResumeQueueRequest;
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;

class CloudTasksApiConcrete implements CloudTasksApiContract
Expand Down Expand Up @@ -65,4 +67,14 @@ public function exists(string $taskName): bool

return false;
}

public function pause(string $queue): void
{
$this->client->pauseQueue(PauseQueueRequest::build($queue));
}

public function resume(string $queue): void
{
$this->client->resumeQueue(ResumeQueueRequest::build($queue));
}
}
4 changes: 4 additions & 0 deletions src/CloudTasksApiContract.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ public function deleteTask(string $taskName): void;
public function getTask(string $taskName): Task;

public function exists(string $taskName): bool;

public function pause(string $queue): void;

public function resume(string $queue): void;
}
25 changes: 25 additions & 0 deletions src/CloudTasksApiFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class CloudTasksApiFake implements CloudTasksApiContract
*/
public array $deletedTasks = [];

/**
* @var array<string, true>
*/
public array $pausedQueues = [];

public function createTask(string $queueName, Task $task): Task
{
$this->createdTasks[] = compact('queueName', 'task');
Expand Down Expand Up @@ -51,6 +56,16 @@ public function exists(string $taskName): bool
return false;
}

public function pause(string $queue): void
{
$this->pausedQueues[$queue] = true;
}

public function resume(string $queue): void
{
unset($this->pausedQueues[$queue]);
}

public function assertTaskDeleted(string $taskName): void
{
Assert::assertTrue(
Expand Down Expand Up @@ -85,4 +100,14 @@ public function assertCreatedTaskCount(int $count): void
{
Assert::assertCount($count, $this->createdTasks);
}

public function assertQueuePaused(string $queue): void
{
Assert::assertTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to be paused, but is not');
}

public function assertQueueNotPaused(string $queue): void
{
Assert::assertNotTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to not be paused, but it is');
}
}
14 changes: 14 additions & 0 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,18 @@ private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName)

return $envVars;
}

public function pause(string $queue): void
{
$queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue);

CloudTasksApi::pause($queue);
}

public function resume(string $queue): void
{
$queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue);

CloudTasksApi::resume($queue);
}
}
27 changes: 27 additions & 0 deletions src/CloudTasksServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
use Illuminate\Routing\Router;
use Illuminate\Events\Dispatcher;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\Facades\Queue;
use Illuminate\Foundation\Application;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\QueuePaused;
use Illuminate\Queue\Events\QueueResumed;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
Expand Down Expand Up @@ -113,6 +116,30 @@ private function registerEvents(): void
return;
}
});

if (class_exists('Illuminate\Queue\Events\QueuePaused')) {
$events->listen(QueuePaused::class, function (QueuePaused $event) { // @phpstan-ignore-line
$queue = Queue::connection($event->connection); // @phpstan-ignore-line

if (! $queue instanceof CloudTasksQueue) {
return;
}

$queue->pause($event->queue); // @phpstan-ignore-line
});
}

if (class_exists('Illuminate\Queue\Events\QueueResumed')) {
$events->listen(QueueResumed::class, function (QueueResumed $event) { // @phpstan-ignore-line
$queue = Queue::connection($event->connection); // @phpstan-ignore-line

if (! $queue instanceof CloudTasksQueue) {
return;
}

$queue->resume($event->queue); // @phpstan-ignore-line
});
}
}

private function registerCommands(): void
Expand Down
86 changes: 86 additions & 0 deletions tests/CloudTasksApiTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
use Google\ApiCore\ApiException;
use Google\Cloud\Tasks\V2\HttpMethod;
use Google\Cloud\Tasks\V2\HttpRequest;
use Google\Cloud\Tasks\V2\Queue\State;
use PHPUnit\Framework\Attributes\Test;
use Google\Cloud\Tasks\V2\GetQueueRequest;
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;

Expand Down Expand Up @@ -138,4 +140,88 @@ public function test_delete_task()
$this->expectExceptionMessage('NOT_FOUND');
CloudTasksApi::getTask($task->getName());
}

#[Test]
public function it_can_pause_queues(): void
{
$queueName = $this->client->queueName(
env('CI_CLOUD_TASKS_PROJECT_ID'),
env('CI_CLOUD_TASKS_LOCATION'),
env('CI_CLOUD_TASKS_QUEUE').'-pause'
);

$this->ensureQueueIs($queueName, State::RUNNING);

// Act
CloudTasksApi::pause($queueName);

// Assert
$this->assertEquals(State::PAUSED, $this->waitForQueueState($queueName, State::PAUSED));
}

#[Test]
public function it_can_resume_queues(): void
{
$queueName = $this->client->queueName(
env('CI_CLOUD_TASKS_PROJECT_ID'),
env('CI_CLOUD_TASKS_LOCATION'),
env('CI_CLOUD_TASKS_QUEUE').'-pause'
);

$this->ensureQueueIs($queueName, State::PAUSED);

// Act
CloudTasksApi::resume($queueName);

// Assert
$this->assertEquals(State::RUNNING, $this->waitForQueueState($queueName, State::RUNNING));
}

private function getQueueState(string $queue): int
{
return $this->client->getQueue(GetQueueRequest::build($queue))->getState();
}

private function waitForQueueState(string $queue, int $waitForState): ?int
{
$state = null;
$attempts = 0;

while ($state !== $waitForState) {
$state = $this->getQueueState($queue);

if ($state === $waitForState) {
return $state;
}

$attempts++;

if ($attempts >= 10) {
break;
}

sleep(1);
}

return $state;
}

private function ensureQueueIs(string $queue, int $desiredState): void
{
$currentState = $this->getQueueState($queue);

if ($currentState === $desiredState) {
return;
}

if ($currentState === State::RUNNING && $desiredState === State::PAUSED) {
CloudTasksApi::pause($queue);
}

if ($currentState === State::PAUSED && $desiredState === State::RUNNING) {
CloudTasksApi::resume($queue);
}

$this->assertEquals($desiredState, $this->waitForQueueState($queue, $desiredState));
}
}
46 changes: 46 additions & 0 deletions tests/PauseResumeQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Tests;

use PHPUnit\Framework\Attributes\Test;
use Illuminate\Support\Facades\Artisan;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;

class PauseResumeQueueTest extends TestCase
{
#[Test]
public function queue_can_be_paused(): void
{
// Arrange
if (app()->version() < 12) {
$this->markTestSkipped('This feature only exists in Laravel 12 and up.');
}

CloudTasksApi::fake();

// $this->artisan('queue:pause cloudtasks:barbequeue');
Artisan::call('queue:pause my-cloudtasks-connection:barbequeue');

// Assert
CloudTasksApi::assertQueuePaused('barbequeue');
}

#[Test]
public function queue_can_be_resumed(): void
{
// Arrange
if (app()->version() < 12) {
$this->markTestSkipped('This feature only exists in Laravel 12 and up.');
}

CloudTasksApi::fake();

Artisan::call('queue:pause my-cloudtasks-connection:barbequeue');
Artisan::call('queue:continue my-cloudtasks-connection:barbequeue');

// Assert
CloudTasksApi::assertQueueNotPaused('barbequeue');
}
}