Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Command/AnonConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class AnonConsumerCommand extends BaseConsumerCommand
{
protected function configure()
protected function configure(): void
{
parent::configure();

Expand All @@ -15,7 +15,7 @@ protected function configure()

}

protected function getConsumerService()
protected function getConsumerService(): string
{
return 'old_sound_rabbit_mq.%s_anon';
}
Expand Down
47 changes: 31 additions & 16 deletions Command/BaseConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

namespace OldSound\RabbitMqBundle\Command;

use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer as Consumer;
use OldSound\RabbitMqBundle\RabbitMq\AnonConsumer;
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer;
use OldSound\RabbitMqBundle\RabbitMq\MultipleConsumer;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -11,15 +15,20 @@

abstract class BaseConsumerCommand extends BaseRabbitMqCommand
{
/** @var DynamicConsumer|MultipleConsumer|AnonConsumer */
protected $consumer;

/** @var string */
protected $amount;

/**
* @return mixed
*/
abstract protected function getConsumerService();

public function stopConsumer()
public function stopConsumer(): void
{
if ($this->consumer instanceof Consumer) {
if ($this->consumer instanceof BaseConsumer) {
// Process current message, then halt consumer
$this->consumer->forceStopConsumer();

Expand All @@ -30,20 +39,29 @@ public function stopConsumer()
}
}

public function restartConsumer()
public function restartConsumer():void
{
// TODO: Implement restarting of consumer
if ($this->consumer instanceof BaseConsumer) {
// Process current message, then halt consumer
$this->consumer->forceStopConsumer();

// Halt consumer if waiting for a new message from the queue
try {
$this->consumer->start();
} catch (\ErrorException $e) {
}
}
}

protected function configure()
protected function configure(): void
{
parent::configure();

$this
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null)
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)')
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
;
Expand All @@ -52,15 +70,10 @@ protected function configure()
/**
* Executes the current command.
*
* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
*
* @return integer 0 if everything went fine, or an error code
*
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (defined('AMQP_WITHOUT_SIGNALS') === false) {
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
Expand All @@ -87,17 +100,19 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
$this->initConsumer($input);

return $this->consumer->consume($this->amount);
return $this->consumer->consume((int)$this->amount);
}

protected function initConsumer($input)
protected function initConsumer(InputInterface $input): void
{
$this->consumer = $this->getContainer()
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));

if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && (int)$input->getOption('memory-limit') > 0) {
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
}
$this->consumer->setRoutingKey($input->getOption('route'));

$this->consumer->setContext($input->getArgument('context'));
}
}
7 changes: 2 additions & 5 deletions Command/BaseRabbitMqCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@ abstract class BaseRabbitMqCommand extends Command implements ContainerAwareInte
/**
* {@inheritDoc}
*/
public function setContainer(ContainerInterface $container = null)
public function setContainer(ContainerInterface $container = null): void
{
$this->container = $container;
}

/**
* @return ContainerInterface
*/
public function getContainer()
public function getContainer(): ContainerInterface
{
return $this->container;
}
Expand Down
24 changes: 9 additions & 15 deletions Command/BatchConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class BatchConsumerCommand extends BaseRabbitMqCommand
*/
protected $consumer;

public function stopConsumer()
public function stopConsumer(): void
{
if ($this->consumer instanceof BatchConsumer) {
// Process current message, then halt consumer
Expand All @@ -29,7 +29,7 @@ public function stopConsumer()
}
}

protected function configure()
protected function configure(): void
{
parent::configure();

Expand All @@ -38,25 +38,25 @@ protected function configure()
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', 0)
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process')
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
->setDescription('Executes a Batch Consumer');
;
}

/**
* Executes the current command.
*

* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
*
* @return integer 0 if everything went fine, or an error code
*
* @throws \InvalidArgumentException When the number of batches to consume is less than 0
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
* @throws \BadFunctionCallException|\ErrorException When the pcntl is not installed and option -s is true
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (defined('AMQP_WITHOUT_SIGNALS') === false) {
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
Expand Down Expand Up @@ -86,10 +86,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
return $this->consumer->consume($batchAmountTarget);
}

/**
* @param InputInterface $input
*/
protected function initConsumer(InputInterface $input)
protected function initConsumer(InputInterface $input): void
{
$this->consumer = $this->getContainer()
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
Expand All @@ -103,10 +100,7 @@ protected function initConsumer(InputInterface $input)
$this->consumer->setRoutingKey($input->getOption('route'));
}

/**
* @return string
*/
protected function getConsumerService()
protected function getConsumerService(): string
{
return 'old_sound_rabbit_mq.%s_batch';
}
Expand Down
4 changes: 2 additions & 2 deletions Command/ConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

class ConsumerCommand extends BaseConsumerCommand
{
protected function configure()
protected function configure(): void
{
parent::configure();
$this->setDescription('Executes a consumer');
$this->setName('rabbitmq:consumer');
}

protected function getConsumerService()
protected function getConsumerService(): string
{
return 'old_sound_rabbit_mq.%s_consumer';
}
Expand Down
10 changes: 2 additions & 8 deletions Command/DeleteCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
class DeleteCommand extends ConsumerCommand
{
protected function configure()
protected function configure(): void
{
$this->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
->setDescription('Delete a consumer\'s queue')
Expand All @@ -22,13 +22,7 @@ protected function configure()
$this->setName('rabbitmq:delete');
}

/**
* @param InputInterface $input
* @param OutputInterface $output
*
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$noConfirmation = (bool) $input->getOption('no-confirmation');

Expand Down
8 changes: 5 additions & 3 deletions Command/DynamicConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
*/
namespace OldSound\RabbitMqBundle\Command;

use OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;

class DynamicConsumerCommand extends BaseConsumerCommand
{
protected function configure()
protected function configure(): void
{
parent::configure();

Expand All @@ -25,12 +27,12 @@ protected function configure()
;
}

protected function getConsumerService()
protected function getConsumerService(): string
{
return 'old_sound_rabbit_mq.%s_dynamic';
}

protected function initConsumer($input)
protected function initConsumer(InputInterface $input): void
{
parent::initConsumer($input);
$this->consumer->setContext($input->getArgument('context'));
Expand Down
7 changes: 4 additions & 3 deletions Command/MultipleConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
namespace OldSound\RabbitMqBundle\Command;

use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;

class MultipleConsumerCommand extends BaseConsumerCommand
{
protected function configure()
protected function configure(): void
{
parent::configure();

Expand All @@ -16,12 +17,12 @@ protected function configure()
;
}

protected function getConsumerService()
protected function getConsumerService(): string
{
return 'old_sound_rabbit_mq.%s_multiple';
}

protected function initConsumer($input)
protected function initConsumer(InputInterface $input): void
{
parent::initConsumer($input);
$this->consumer->setContext($input->getArgument('context'));
Expand Down
12 changes: 3 additions & 9 deletions Command/PurgeConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
class PurgeConsumerCommand extends ConsumerCommand
{
protected function configure()
protected function configure(): void
{
$this->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
->setDescription('Purge a consumer\'s queue')
Expand All @@ -22,13 +22,7 @@ protected function configure()
$this->setName('rabbitmq:purge');
}

/**
* @param InputInterface $input
* @param OutputInterface $output
*
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$noConfirmation = (bool) $input->getOption('no-confirmation');

Expand All @@ -50,7 +44,7 @@ protected function execute(InputInterface $input, OutputInterface $output)

$this->consumer = $this->getContainer()
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
$this->consumer->purge($input->getArgument('name'));
$this->consumer->purge();

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions Command/RpcServerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class RpcServerCommand extends BaseRabbitMqCommand
{
protected function configure()
protected function configure(): void
{
parent::configure();

Expand All @@ -32,7 +32,7 @@ protected function configure()
*
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
$amount = $input->getOption('messages');
Expand Down
4 changes: 2 additions & 2 deletions Command/SetupFabricCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SetupFabricCommand extends BaseRabbitMqCommand
{
protected function configure()
protected function configure(): void
{
$this
->setName('rabbitmq:setup-fabric')
Expand All @@ -18,7 +18,7 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (defined('AMQP_DEBUG') === false) {
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
Expand Down
Loading