Skip to content

[bundle] Multi Client Configuration #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
multi client configuration
  • Loading branch information
ASKozienko committed Nov 15, 2018
commit 3e4d486e217860235bcc721be2f7522ff1727e07
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ private function loadMessageQueueCollector(array $config, ContainerBuilder $cont
$service = $container->register('enqueue.profiler.message_queue_collector', MessageQueueCollector::class);
$service->addTag('data_collector', [
'template' => '@Enqueue/Profiler/panel.html.twig',
'id' => 'enqueue.message_queue'
'id' => 'enqueue.message_queue',
]);

foreach ($configNames as $configName) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/enqueue-bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
class: 'Enqueue\Symfony\Consumption\ConfigurableConsumeCommand'
arguments:
- '@enqueue.locator'
- '%enqueue.default_transport%'
- 'enqueue.transport.%s.queue_consumer'
- 'enqueue.transport.%s.processor_registry'
tags:
Expand All @@ -18,6 +19,7 @@ services:
class: 'Enqueue\Symfony\Client\ConsumeCommand'
arguments:
- '@enqueue.locator'
- '%enqueue.default_client%'
- 'enqueue.client.%s.queue_consumer'
- 'enqueue.client.%s.driver'
- 'enqueue.client.%s.delegate_processor'
Expand All @@ -28,6 +30,7 @@ services:
class: 'Enqueue\Symfony\Client\ProduceCommand'
arguments:
- '@enqueue.locator'
- '%enqueue.default_client%'
- 'enqueue.client.%s.producer'
tags:
- { name: 'console.command' }
Expand All @@ -36,6 +39,7 @@ services:
class: 'Enqueue\Symfony\Client\SetupBrokerCommand'
arguments:
- '@enqueue.locator'
- '%enqueue.default_client%'
- 'enqueue.client.%s.driver'
tags:
- { name: 'console.command' }
Expand All @@ -44,6 +48,7 @@ services:
class: 'Enqueue\Symfony\Client\RoutesCommand'
arguments:
- '@enqueue.locator'
- '%enqueue.default_client%'
- 'enqueue.client.%s.driver'
tags:
- { name: 'console.command' }
13 changes: 10 additions & 3 deletions pkg/enqueue/Symfony/Client/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class ConsumeCommand extends Command
*/
private $container;

/**
* @var string
*/
private $defaultClient;

/**
* @var string
*/
Expand All @@ -51,16 +56,18 @@ class ConsumeCommand extends Command

public function __construct(
ContainerInterface $container,
string $defaultClient,
string $queueConsumerIdPattern = 'enqueue.client.%s.queue_consumer',
string $driverIdPattern = 'enqueue.client.%s.driver',
string $processorIdPatter = 'enqueue.client.%s.delegate_processor'
) {
parent::__construct(self::$defaultName);

$this->container = $container;
$this->defaultClient = $defaultClient;
$this->queueConsumerIdPattern = $queueConsumerIdPattern;
$this->driverIdPattern = $driverIdPattern;
$this->processorIdPattern = $processorIdPatter;

parent::__construct(self::$defaultName);
}

protected function configure(): void
Expand All @@ -77,7 +84,7 @@ protected function configure(): void
'It select an appropriate message processor based on a message headers')
->addArgument('client-queue-names', InputArgument::IS_ARRAY, 'Queues to consume messages from')
->addOption('skip', null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'Queues to skip consumption of messages from', [])
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', $this->defaultClient)
;
}

Expand Down
14 changes: 10 additions & 4 deletions pkg/enqueue/Symfony/Client/ProduceCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ class ProduceCommand extends Command
*/
private $container;

/**
* @var string
*/
private $defaultClient;

/**
* @var string
*/
private $producerIdPattern;

public function __construct(ContainerInterface $container, string $producerIdPattern = 'enqueue.client.%s.producer')
public function __construct(ContainerInterface $container, string $defaultClient, string $producerIdPattern = 'enqueue.client.%s.producer')
{
parent::__construct(static::$defaultName);

$this->container = $container;
$this->defaultClient = $defaultClient;
$this->producerIdPattern = $producerIdPattern;

parent::__construct(static::$defaultName);
}

protected function configure(): void
{
$this
->setDescription('Sends an event to the topic')
->addArgument('message', InputArgument::REQUIRED, 'A message')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', $this->defaultClient)
->addOption('topic', null, InputOption::VALUE_OPTIONAL, 'The topic to send a message to')
->addOption('command', null, InputOption::VALUE_OPTIONAL, 'The command to send a message to')
;
Expand Down
14 changes: 10 additions & 4 deletions pkg/enqueue/Symfony/Client/RoutesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class RoutesCommand extends Command
*/
private $container;

/**
* @var string
*/
private $defaultClient;

/**
* @var string
*/
Expand All @@ -32,12 +37,13 @@ class RoutesCommand extends Command
*/
private $driver;

public function __construct(ContainerInterface $container, string $driverIdPatter = 'enqueue.client.%s.driver')
public function __construct(ContainerInterface $container, string $defaultClient, string $driverIdPatter = 'enqueue.client.%s.driver')
{
parent::__construct(static::$defaultName);

$this->container = $container;
$this->defaultClient = $defaultClient;
$this->driverIdPatter = $driverIdPatter;

parent::__construct(static::$defaultName);
}

protected function configure(): void
Expand All @@ -46,7 +52,7 @@ protected function configure(): void
->setAliases(['debug:enqueue:routes'])
->setDescription('A command lists all registered routes.')
->addOption('show-route-options', null, InputOption::VALUE_NONE, 'Adds ability to hide options.')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', $this->defaultClient)
;

$this->driver = null;
Expand Down
14 changes: 10 additions & 4 deletions pkg/enqueue/Symfony/Client/SetupBrokerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ class SetupBrokerCommand extends Command
*/
private $container;

/**
* @var string
*/
private $defaultClient;

/**
* @var string
*/
private $driverIdPattern;

public function __construct(ContainerInterface $container, string $driverIdPattern = 'enqueue.client.%s.driver')
public function __construct(ContainerInterface $container, string $defaultClient, string $driverIdPattern = 'enqueue.client.%s.driver')
{
parent::__construct(static::$defaultName);

$this->container = $container;
$this->defaultClient = $defaultClient;
$this->driverIdPattern = $driverIdPattern;

parent::__construct(static::$defaultName);
}

protected function configure(): void
{
$this
->setAliases(['enq:sb'])
->setDescription('Setup broker. Configure the broker, creates queues, topics and so on.')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default')
->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', $this->defaultClient)
;
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/enqueue/Symfony/Consumption/ConfigurableConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class ConfigurableConsumeCommand extends Command
*/
private $container;

/**
* @var string
*/
private $defaultTransport;

/**
* @var string
*/
Expand All @@ -39,14 +44,16 @@ class ConfigurableConsumeCommand extends Command

public function __construct(
ContainerInterface $container,
string $defaultTransport,
string $queueConsumerIdPattern = 'enqueue.transport.%s.queue_consumer',
string $processorRegistryIdPattern = 'enqueue.transport.%s.processor_registry'
) {
parent::__construct(static::$defaultName);

$this->container = $container;
$this->defaultTransport = $defaultTransport;
$this->queueConsumerIdPattern = $queueConsumerIdPattern;
$this->processorRegistryIdPattern = $processorRegistryIdPattern;

parent::__construct(static::$defaultName);
}

protected function configure(): void
Expand All @@ -61,7 +68,7 @@ protected function configure(): void
'and a message processor service')
->addArgument('processor', InputArgument::REQUIRED, 'A message processor.')
->addArgument('queues', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'A queue to consume from', [])
->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', 'default')
->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', $this->defaultTransport)
;
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/enqueue/Tests/Client/TraceableProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function testShouldCollectInfoIfStringGivenAsEventMessage()
'messageId' => null,
],
], $producer->getTraces());

$this->assertArrayHasKey('sentAt', $producer->getTraces()[0]);
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/enqueue/Tests/Symfony/Client/ConsumeCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ public function testShouldNotBeFinal()

public function testCouldBeConstructedWithRequiredAttributes()
{
new ConsumeCommand($this->createMock(ContainerInterface::class));
new ConsumeCommand($this->createMock(ContainerInterface::class), 'default');
}

public function testShouldHaveCommandName()
{
$command = new ConsumeCommand($this->createMock(ContainerInterface::class));
$command = new ConsumeCommand($this->createMock(ContainerInterface::class), 'default');

$this->assertEquals('enqueue:consume', $command->getName());
}

public function testShouldHaveExpectedOptions()
{
$command = new ConsumeCommand($this->createMock(ContainerInterface::class));
$command = new ConsumeCommand($this->createMock(ContainerInterface::class), 'default');

$options = $command->getDefinition()->getOptions();

Expand All @@ -64,7 +64,7 @@ public function testShouldHaveExpectedOptions()

public function testShouldHaveExpectedAttributes()
{
$command = new ConsumeCommand($this->createMock(ContainerInterface::class));
$command = new ConsumeCommand($this->createMock(ContainerInterface::class), 'default');

$arguments = $command->getDefinition()->getArguments();

Expand Down Expand Up @@ -104,7 +104,7 @@ public function testShouldBindDefaultQueueOnly()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([]);
Expand Down Expand Up @@ -164,7 +164,7 @@ public function testShouldUseRequestedClient()
'enqueue.client.foo.queue_consumer' => $fooConsumer,
'enqueue.client.foo.driver' => $fooDriver,
'enqueue.client.foo.delegate_processor' => $fooProcessor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([
Expand Down Expand Up @@ -198,7 +198,7 @@ public function testThrowIfNotDefinedClientRequested()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);

Expand Down Expand Up @@ -243,7 +243,7 @@ public function testShouldBindDefaultQueueIfRouteUseDifferentQueue()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([]);
Expand Down Expand Up @@ -295,7 +295,7 @@ public function testShouldBindCustomExecuteConsumptionAndUseCustomClientDestinat
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([]);
Expand Down Expand Up @@ -336,7 +336,7 @@ public function testShouldBindUserProvidedQueues()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([
Expand Down Expand Up @@ -378,7 +378,7 @@ public function testShouldBindNotPrefixedQueue()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([
Expand Down Expand Up @@ -434,7 +434,7 @@ public function testShouldBindQueuesOnlyOnce()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([]);
Expand Down Expand Up @@ -475,7 +475,7 @@ public function testShouldNotBindExternalRoutes()
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([]);
Expand Down Expand Up @@ -528,7 +528,7 @@ public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName(
'enqueue.client.default.queue_consumer' => $consumer,
'enqueue.client.default.driver' => $driver,
'enqueue.client.default.delegate_processor' => $processor,
]));
]), 'default');

$tester = new CommandTester($command);
$tester->execute([
Expand Down
Loading