Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[amqp] One single transport factory for all supported amqp implementa… #233

Merged
merged 1 commit into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 4 additions & 200 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ enqueue:

# The option tells whether RabbitMQ broker has delay plugin installed or not
delay_plugin_installed: false
amqp_ext:
amqp:
driver: ~ # One of "ext"; "lib"; "bunny"

# The connection to AMQP broker set as a string. Other parameters could be used as defaults
dsn: ~
Expand Down Expand Up @@ -86,106 +87,8 @@ enqueue:

# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~
rabbitmq_amqp_ext:

# The connection to AMQP broker set as a string. Other parameters could be used as defaults
dsn: ~

# The host to connect too. Note: Max 1024 characters
host: ~

# Port on the host.
port: ~

# The user name to use. Note: Max 128 characters.
user: ~

# Password. Note: Max 128 characters.
pass: ~

# The virtual host on the host. Note: Max 128 characters.
vhost: ~

# Connection timeout. Note: 0 or greater seconds. May be fractional.
connection_timeout: ~

# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
read_timeout: ~

# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
write_timeout: ~

# How often to send heartbeat. 0 means off.
heartbeat: ~
persisted: ~
lazy: ~

# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
receive_method: ~ # One of "basic_get"; "basic_consume"

# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
qos_prefetch_size: ~

# Specifies a prefetch window in terms of whole messages
qos_prefetch_count: ~

# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
qos_global: ~

# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~

# The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id
delay_strategy: dlx
amqp_lib:

# The connection to AMQP broker set as a string. Other parameters could be used as defaults
dsn: ~

# The host to connect too. Note: Max 1024 characters
host: ~

# Port on the host.
port: ~

# The user name to use. Note: Max 128 characters.
user: ~

# Password. Note: Max 128 characters.
pass: ~

# The virtual host on the host. Note: Max 128 characters.
vhost: ~

# Connection timeout. Note: 0 or greater seconds. May be fractional.
connection_timeout: ~

# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
read_timeout: ~

# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
write_timeout: ~

# How often to send heartbeat. 0 means off.
heartbeat: ~
persisted: ~
lazy: ~

# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
receive_method: ~ # One of "basic_get"; "basic_consume"

# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
qos_prefetch_size: ~

# Specifies a prefetch window in terms of whole messages
qos_prefetch_count: ~

# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
qos_global: ~

# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~
rabbitmq_amqp_lib:
rabbitmq_amqp:
driver: ~ # One of "ext"; "lib"; "bunny"

# The connection to AMQP broker set as a string. Other parameters could be used as defaults
dsn: ~
Expand Down Expand Up @@ -293,105 +196,6 @@ enqueue:

# the connection will be performed as later as possible, if the option set to true
lazy: true
amqp_bunny:

# The connection to AMQP broker set as a string. Other parameters could be used as defaults
dsn: ~

# The host to connect too. Note: Max 1024 characters
host: ~

# Port on the host.
port: ~

# The user name to use. Note: Max 128 characters.
user: ~

# Password. Note: Max 128 characters.
pass: ~

# The virtual host on the host. Note: Max 128 characters.
vhost: ~

# Connection timeout. Note: 0 or greater seconds. May be fractional.
connection_timeout: ~

# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
read_timeout: ~

# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
write_timeout: ~

# How often to send heartbeat. 0 means off.
heartbeat: ~
persisted: ~
lazy: ~

# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
receive_method: ~ # One of "basic_get"; "basic_consume"

# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
qos_prefetch_size: ~

# Specifies a prefetch window in terms of whole messages
qos_prefetch_count: ~

# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
qos_global: ~

# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~
rabbitmq_amqp_bunny:

# The connection to AMQP broker set as a string. Other parameters could be used as defaults
dsn: ~

# The host to connect too. Note: Max 1024 characters
host: ~

# Port on the host.
port: ~

# The user name to use. Note: Max 128 characters.
user: ~

# Password. Note: Max 128 characters.
pass: ~

# The virtual host on the host. Note: Max 128 characters.
vhost: ~

# Connection timeout. Note: 0 or greater seconds. May be fractional.
connection_timeout: ~

# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
read_timeout: ~

# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
write_timeout: ~

# How often to send heartbeat. 0 means off.
heartbeat: ~
persisted: ~
lazy: ~

# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
receive_method: ~ # One of "basic_get"; "basic_consume"

# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
qos_prefetch_size: ~

# Specifies a prefetch window in terms of whole messages
qos_prefetch_count: ~

# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
qos_global: ~

# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~

# The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id
delay_strategy: dlx
client:
traceable_producer: false
prefix: enqueue
Expand Down
6 changes: 3 additions & 3 deletions pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ public function registerDelayStrategy(ContainerBuilder $container, array $config
if ($config['delay_strategy']) {
$factory = $container->getDefinition($factoryId);

if (false == is_a($factory->getClass(), DelayStrategyAware::class, true)) {
if (false == (is_a($factory->getClass(), DelayStrategyAware::class, true) || $factory->getFactory())) {
throw new \LogicException('Connection factory does not support delays');
}

if (strtolower($config['delay_strategy']) === 'dlx') {
if ('dlx' === strtolower($config['delay_strategy'])) {
$delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName);
$container->register($delayId, RabbitMqDlxDelayStrategy::class);

$factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]);
} elseif (strtolower($config['delay_strategy']) === 'delayed_message_plugin') {
} elseif ('delayed_message_plugin' === strtolower($config['delay_strategy'])) {
$delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName);
$container->register($delayId, RabbitMqDelayPluginDelayStrategy::class);

Expand Down
19 changes: 2 additions & 17 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

namespace Enqueue\Bundle;

use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory;
use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory;
use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory;
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventsPass;
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
Expand Down Expand Up @@ -55,15 +52,8 @@ public function build(ContainerBuilder $container)
$extension->addTransportFactory(new RabbitMqStompTransportFactory());
}

if (class_exists(AmqpExtConnectionFactory::class)) {
$extension->addTransportFactory(new AmqpTransportFactory(AmqpExtConnectionFactory::class, 'amqp_ext'));
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpExtConnectionFactory::class, 'rabbitmq_amqp_ext'));
}

if (class_exists(AmqpLibConnectionFactory::class)) {
$extension->addTransportFactory(new AmqpTransportFactory(AmqpLibConnectionFactory::class, 'amqp_lib'));
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpLibConnectionFactory::class, 'rabbitmq_amqp_lib'));
}
$extension->addTransportFactory(new AmqpTransportFactory('amqp'));
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory('rabbitmq_amqp'));

if (class_exists(FsConnectionFactory::class)) {
$extension->addTransportFactory(new FsTransportFactory());
Expand All @@ -81,11 +71,6 @@ public function build(ContainerBuilder $container)
$extension->addTransportFactory(new SqsTransportFactory());
}

if (class_exists(AmqpBunnyConnectionFactory::class)) {
$extension->addTransportFactory(new AmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'amqp_bunny'));
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'rabbitmq_amqp_bunny'));
}

$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ public function setUp()

public function provideEnqueueConfigs()
{
yield 'amqp_ext' => [[
yield 'amqp' => [[
'transport' => [
'default' => 'amqp_ext',
'amqp_ext' => [
'default' => 'amqp',
'amqp' => [
'driver' => 'ext',
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
'user' => getenv('SYMFONY__RABBITMQ__USER'),
Expand All @@ -39,8 +40,8 @@ public function provideEnqueueConfigs()

yield 'amqp_dsn' => [[
'transport' => [
'default' => 'amqp_ext',
'amqp_ext' => getenv('AMQP_DSN'),
'default' => 'amqp',
'amqp' => getenv('AMQP_DSN'),
],
]];

Expand Down
Loading