Skip to content

Commit

Permalink
Merge pull request #572 from php-enqueue/bundle-multi-transport-confi…
Browse files Browse the repository at this point in the history
…guration

Bundle multi transport configuration
  • Loading branch information
makasim authored Oct 21, 2018
2 parents 682a05f + bc571c2 commit 26dae59
Show file tree
Hide file tree
Showing 15 changed files with 395 additions and 156 deletions.
53 changes: 49 additions & 4 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
use Enqueue\Symfony\DependencyInjection\TransportFactory;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

Expand All @@ -22,14 +23,58 @@ public function getConfigTreeBuilder(): TreeBuilder
$rootNode = $tb->root('enqueue');
$rootNode
->beforeNormalization()
->ifEmpty()->then(function () {
return ['transport' => ['dsn' => 'null:']];
});
->always(function ($value) {
if (empty($value)) {
return [
'transport' => [
'default' => [
'dsn' => 'null:',
],
],
];
}

if (is_string($value)) {
return [
'transport' => [
'default' => [
'dsn' => $value,
],
],
];
}

return $value;
})
;

$transportFactory = new TransportFactory('default');

/** @var ArrayNodeDefinition $transportNode */
$transportNode = $rootNode->children()->arrayNode('transport');
$transportFactory->addTransportConfiguration($transportNode);
$transportNode
->beforeNormalization()
->always(function ($value) {
if (empty($value)) {
return ['default' => ['dsn' => 'null:']];
}
if (is_string($value)) {
return ['default' => ['dsn' => $value]];
}

if (is_array($value) && array_key_exists('dsn', $value)) {
return ['default' => $value];
}

return $value;
});
$transportPrototypeNode = $transportNode
->requiresAtLeastOneElement()
->useAttributeAsKey('key')
->prototype('array')
;

$transportFactory->addTransportConfiguration($transportPrototypeNode);

$consumptionNode = $rootNode->children()->arrayNode('consumption');
$transportFactory->addQueueConsumerConfiguration($consumptionNode);
Expand Down
18 changes: 11 additions & 7 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ public function load(array $configs, ContainerBuilder $container): void
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yml');

$transportFactory = (new TransportFactory('default'));
$transportFactory->buildConnectionFactory($container, $config['transport']);
$transportFactory->buildContext($container, []);
$transportFactory->buildQueueConsumer($container, $config['consumption']);
$transportFactory->buildRpcClient($container, []);
foreach ($config['transport'] as $name => $transportConfig) {
$transportFactory = (new TransportFactory($name));
$transportFactory->buildConnectionFactory($container, $transportConfig);
$transportFactory->buildContext($container, []);
$transportFactory->buildQueueConsumer($container, $config['consumption']);
$transportFactory->buildRpcClient($container, []);
}

$container->setParameter('enqueue.transports', array_keys($config['transport']));

if (isset($config['client'])) {
$this->setupAutowiringForProcessors($container);
Expand All @@ -38,12 +42,12 @@ public function load(array $configs, ContainerBuilder $container): void

$clientConfig = $config['client'];
// todo
$clientConfig['transport'] = $config['transport'];
$clientConfig['transport'] = $config['transport']['default'];
$clientConfig['consumption'] = $config['consumption'];

$clientFactory = new ClientFactory('default');
$clientFactory->build($container, $clientConfig);
$clientFactory->createDriver($container, $config['transport']);
$clientFactory->createDriver($container, $config['transport']['default']);
}

if ($config['job']) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class EnqueueBundle extends Bundle
public function build(ContainerBuilder $container): void
{
//transport passes
$container->addCompilerPass(new BuildConsumptionExtensionsPass('default'));
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
$container->addCompilerPass(new BuildConsumptionExtensionsPass());
$container->addCompilerPass(new BuildProcessorRegistryPass());

//client passes
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default'));
Expand Down
134 changes: 115 additions & 19 deletions pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,133 @@ public function testCouldBeConstructedWithDebugAsArgument()
new Configuration(true);
}

public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll()
public function testShouldProcessNullAsDefaultNullTransport()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[]]);
$config = $processor->processConfiguration($configuration, [null]);

$this->assertEquals([
'transport' => ['dsn' => 'null:'],
'consumption' => [
'receive_timeout' => 10000,
$this->assertConfigEquals([
'transport' => [
'default' => ['dsn' => 'null:'],
],
'job' => false,
'async_events' => ['enabled' => false],
'async_commands' => ['enabled' => false],
'extensions' => [
'doctrine_ping_connection_extension' => false,
'doctrine_clear_identity_map_extension' => false,
'signal_extension' => function_exists('pcntl_signal_dispatch'),
'reply_extension' => true,
], $config);
}

public function testShouldProcessStringAsDefaultDsnTransport()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, ['foo://bar?option=val']);

$this->assertConfigEquals([
'transport' => [
'default' => ['dsn' => 'foo://bar?option=val'],
],
], $config);
}

public function testShouldProcessEmptyArrayAsDefaultNullTransport()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, ['foo://bar?option=val']);

$this->assertConfigEquals([
'transport' => [
'default' => ['dsn' => 'foo://bar?option=val'],
],
], $config);
}

public function testShouldProcessSingleTransportAsDefault()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => 'foo://bar?option=val',
]]);

$this->assertConfigEquals([
'transport' => [
'default' => ['dsn' => 'foo://bar?option=val'],
],
], $config);
}

public function testShouldProcessTransportWithDsnKeyAsDefault()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => [
'dsn' => 'foo://bar?option=val',
],
]]);

$this->assertConfigEquals([
'transport' => [
'default' => ['dsn' => 'foo://bar?option=val'],
],
], $config);
}

public function testShouldUseDefaultTransportIfIfTransportIsConfiguredAtAll()
public function testShouldProcessSeveralTransports()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => null,
'transport' => [
'default' => ['dsn' => 'default:'],
'foo' => ['dsn' => 'foo:'],
'bar' => ['dsn' => 'bar:'],
],
]]);

$this->assertConfigEquals([
'transport' => [
'default' => ['dsn' => 'default:'],
'foo' => ['dsn' => 'foo:'],
'bar' => ['dsn' => 'bar:'],
],
], $config);
}

public function testTransportFactoryShouldValidateEachTransportAccordingToItsRules()
{
$configuration = new Configuration(true);

$processor = new Processor();

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Both options factory_class and factory_service are set. Please choose one.');
$processor->processConfiguration($configuration, [
[
'transport' => [
'default' => [
'factory_class' => 'aClass',
'factory_service' => 'aService',
],
],
],
]);
}

public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[]]);

$this->assertEquals([
'transport' => ['dsn' => 'null:'],
'transport' => ['default' => ['dsn' => 'null:']],
'consumption' => [
'receive_timeout' => 10000,
],
Expand All @@ -88,8 +180,7 @@ public function testShouldSetDefaultConfigurationForClient()
'client' => null,
]]);

$this->assertArraySubset([
'transport' => ['dsn' => 'null:'],
$this->assertConfigEquals([
'client' => [
'prefix' => 'enqueue',
'app_name' => 'app',
Expand Down Expand Up @@ -403,4 +494,9 @@ public function testShouldAllowConfigureConsumption()
],
], $config);
}

private function assertConfigEquals(array $expected, array $actual): void
{
$this->assertArraySubset($expected, $actual, false, var_export($actual, true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,24 @@ public function testShouldConfigureQueueConsumer()
$this->assertSame(456, $def->getArgument(4));
}

public function testShouldSetPropertyWithAllConfiguredTransports()
{
$container = $this->getContainerBuilder(true);

$extension = new EnqueueExtension();
$extension->load([[
'client' => [],
'transport' => [
'default' => ['dsn' => 'default:'],
'foo' => ['dsn' => 'foo:'],
'bar' => ['dsn' => 'foo:'],
],
]], $container);

$this->assertTrue($container->hasParameter('enqueue.transports'));
$this->assertEquals(['default', 'foo', 'bar'], $container->getParameter('enqueue.transports'));
}

public function testShouldLoadProcessAutoconfigureChildDefinition()
{
$container = $this->getContainerBuilder(true);
Expand Down
2 changes: 1 addition & 1 deletion pkg/enqueue/Client/DriverFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function create(ConnectionFactory $factory, string $dsn, array $config):
$dsn = new Dsn($dsn);

if ($driverInfo = $this->findDriverInfo($dsn, Resources::getAvailableDrivers())) {
$driverClass = $driverInfo['factoryClass'];
$driverClass = $driverInfo['driverClass'];

if (RabbitMqDriver::class === $driverClass) {
if (false == $factory instanceof AmqpConnectionFactory) {
Expand Down
Loading

0 comments on commit 26dae59

Please sign in to comment.