Skip to content

Commit

Permalink
Merge pull request #29 from daniellienert/feature/extend-interface
Browse files Browse the repository at this point in the history
!!! FEATURE: Extend the QueueInterface with countReserved and countFailed
  • Loading branch information
daniellienert authored May 30, 2018
2 parents db672b1 + bdffb4a commit acb68e9
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 18 deletions.
5 changes: 4 additions & 1 deletion Classes/Command/JobCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Neos\Cache\Frontend\VariableFrontend;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;
use Neos\Flow\Mvc\Exception\StopActionException;

/**
* Job command controller
Expand Down Expand Up @@ -62,6 +63,7 @@ class JobCommandController extends CommandController
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
* @param bool $verbose Output debugging information
* @return void
* @throws StopActionException
*/
public function workCommand($queue, $exitAfter = null, $limit = null, $verbose = false)
{
Expand Down Expand Up @@ -123,11 +125,12 @@ public function workCommand($queue, $exitAfter = null, $limit = null, $verbose =
* @param string $queue The name of the queue
* @param integer $limit Number of jobs to list (some queues only support a limit of 1)
* @return void
* @throws JobQueueException
*/
public function listCommand($queue, $limit = 1)
{
$jobs = $this->jobManager->peek($queue, $limit);
$totalCount = $this->queueManager->getQueue($queue)->count();
$totalCount = $this->queueManager->getQueue($queue)->countReady();
foreach ($jobs as $job) {
$this->outputLine('<b>%s</b>', [$job->getLabel()]);
}
Expand Down
11 changes: 10 additions & 1 deletion Classes/Command/QueueCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
* source code.
*/

use Flowpack\JobQueue\Common\Exception;
use Flowpack\JobQueue\Common\Queue\QueueManager;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;
use Neos\Flow\Mvc\Exception\StopActionException;
use Neos\Utility\TypeHandling;

/**
Expand All @@ -40,14 +42,15 @@ class QueueCommandController extends CommandController
* Displays all configured queues, their type and the number of messages that are ready to be processed.
*
* @return void
* @throws Exception
*/
public function listCommand()
{
$rows = [];
foreach ($this->queueConfigurations as $queueName => $queueConfiguration) {
$queue = $this->queueManager->getQueue($queueName);
try {
$numberOfMessages = $queue->count();
$numberOfMessages = $queue->countReady();
} catch (\Exception $e) {
$numberOfMessages = '-';
}
Expand All @@ -63,6 +66,7 @@ public function listCommand()
*
* @param string $queue Name of the queue to describe (e.g. "some-queue")
* @return void
* @throws Exception
*/
public function describeCommand($queue)
{
Expand All @@ -83,6 +87,8 @@ public function describeCommand($queue)
*
* @param string $queue Name of the queue to initialize (e.g. "some-queue")
* @return void
* @throws Exception
* @throws StopActionException
*/
public function setupCommand($queue)
{
Expand All @@ -106,6 +112,8 @@ public function setupCommand($queue)
* @param string $queue Name of the queue to flush (e.g. "some-queue")
* @param bool $force This flag is required in order to avoid accidental flushes
* @return void
* @throws Exception
* @throws StopActionException
*/
public function flushCommand($queue, $force = false)
{
Expand Down Expand Up @@ -134,6 +142,7 @@ public function flushCommand($queue, $force = false)
* @param string $payload Arbitrary payload, for example a serialized instance of a class implementing JobInterface
* @param string $options JSON encoded, for example '{"some-option": "some-value"}'
* @return void
* @throws Exception
*/
public function submitCommand($queue, $payload, $options = null)
{
Expand Down
18 changes: 17 additions & 1 deletion Classes/Queue/FakeQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,23 @@ public function peek($limit = 1)
/**
* @inheritdoc
*/
public function count()
public function countReady(): int
{
return 0;
}

/**
* @inheritdoc
*/
public function countReserved(): int
{
return 0;
}

/**
* @inheritdoc
*/
public function countFailed(): int
{
return 0;
}
Expand Down
22 changes: 17 additions & 5 deletions Classes/Queue/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,25 @@ public function finish($messageId);
public function peek($limit = 1);

/**
* Count ready messages in the queue
* Get a count of ready messages currently in the queue.
*
* Get a count of messages currently in the queue.
* @return int The number of ready messages in the queue
*/
public function countReady(): int;

/**
* Get a count of reserved messages currently in the queue.
*
* @return int The number of reserved messages in the queue
*/
public function countReserved(): int;

/**
* Get a count of failed messages currently in the queue.
*
* @return integer The number of messages in the queue
* @return int The number of failed messages in the queue
*/
public function count();
public function countFailed(): int;

/**
* Removes all messages from this queue
Expand All @@ -119,4 +131,4 @@ public function count();
* @return void
*/
public function flush();
}
}
62 changes: 54 additions & 8 deletions Tests/Functional/AbstractQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ public function releasePutsMessageBackToQueue()
$messageId = $this->queue->submit('A message');

$this->queue->waitAndReserve(1);
$this->assertSame(0, $this->queue->count());
$this->assertSame(0, $this->queue->countReady());

$this->queue->release($messageId);
$this->assertSame(1, $this->queue->count());
$this->assertSame(1, $this->queue->countReady());
}

/**
Expand All @@ -167,6 +167,8 @@ public function releaseIncreasesNumberOfReleases()
$this->queue->release($messageId);
$message = $this->queue->waitAndReserve(1);
$this->assertSame(2, $message->getNumberOfReleases());

$this->queue->abort($messageId);
}

/**
Expand All @@ -179,26 +181,70 @@ public function abortRemovesMessageFromActiveQueue()
$this->queue->waitAndReserve(1);

$this->queue->abort($messageId);
$this->assertSame(0, $this->queue->count());
$this->assertSame(0, $this->queue->countReady());
$this->assertNull($this->queue->waitAndTake(1));
}

/**
* @test
*/
public function countReturnsZeroByDefault()
public function countReadyReturnsZeroByDefault()
{
$this->assertSame(0, $this->queue->count());
$this->assertSame(0, $this->queue->countReady());
}

/**
* @test
*/
public function countReturnsNumberOfReadyJobs()
public function countReadyReturnsNumberOfReadyJobs()
{
$this->queue->submit('First message');
$this->queue->submit('Second message');

$this->assertSame(2, $this->queue->count());
$this->assertSame(2, $this->queue->countReady());
}

/**
* @test
*/
public function countFailedReturnsZeroByDefault()
{
$this->assertSame(0, $this->queue->countFailed());
}

/**
* @test
*/
public function countFailedReturnsNumberOfFailedMessages()
{
$messageId = $this->queue->submit('A message');

$this->queue->waitAndReserve(1);
$this->assertSame(0, $this->queue->countFailed());

$this->queue->abort($messageId);
$this->assertSame(1, $this->queue->countFailed());
}

/**
* @test
*/
public function countReservedReturnsZeroByDefault()
{
$this->assertSame(0, $this->queue->countReserved());
}

/**
* @test
*/
public function countReservedReturnsNumberOfReservedMessages()
{
$messageId = $this->queue->submit('A message');

$this->queue->waitAndReserve(1);
$this->assertSame(1, $this->queue->countReserved());

$this->queue->abort($messageId);
$this->assertSame(0, $this->queue->countReserved());
}
}
}
25 changes: 23 additions & 2 deletions Tests/Unit/Fixtures/TestQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@ class TestQueue implements QueueInterface
/**
* @var string[]
*/
protected $processingMessages = [];
protected $reservedMessages = [];

/**
* @var string[]
*/
protected $failedMessages = [];

/**
* @var string[]
*/
protected $processingMessages = [];

/**
* @var int[]
*/
Expand Down Expand Up @@ -205,11 +210,27 @@ public function peek($limit = 1)
/**
* @inheritdoc
*/
public function count()
public function countReady():int
{
return count($this->readyMessages);
}

/**
* @inheritdoc
*/
public function countReserved(): int
{
return count($this->reservedMessages);
}

/**
* @inheritdoc
*/
public function countFailed(): int
{
return count($this->failedMessages);
}

/**
* @inheritdoc
*/
Expand Down

0 comments on commit acb68e9

Please sign in to comment.