Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
->children()
Expand Down Expand Up @@ -193,6 +194,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
Expand All @@ -217,7 +219,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->end()
;
}

protected function addDynamicConsumers(ArrayNodeDefinition $node)
{
$node
Expand All @@ -233,6 +235,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
->children()
Expand Down
11 changes: 10 additions & 1 deletion DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ protected function loadConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['timeout_wait'])) {
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
}
if (isset($consumer['graceful_max_execution'])) {
$definition->addMethodCall(
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
Expand Down Expand Up @@ -290,6 +293,9 @@ protected function loadMultipleConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['timeout_wait'])) {
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
}
if (isset($consumer['graceful_max_execution'])) {
$definition->addMethodCall(
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
Expand Down Expand Up @@ -363,6 +369,9 @@ protected function loadDynamicConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['timeout_wait'])) {
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
}
if (isset($consumer['graceful_max_execution'])) {
$definition->addMethodCall(
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
Expand Down Expand Up @@ -457,7 +466,7 @@ protected function loadBatchConsumers()
protected function loadAnonConsumers()
{
foreach ($this->config['anon_consumers'] as $key => $anon) {
$definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
$definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
$definition
->setPublic(true)
->addTag('old_sound_rabbit_mq.base_amqp')
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,23 @@ consumers:
idle_timeout_exit_code: 0
```

#### Timeout wait ####

Set the `timeout_wait` in seconds.
The `timeout_wait` specifies how long the consumer will wait without receiving a new message before ensuring the current connection is still valid.

```yaml
consumers:
upload_picture:
connection: default
exchange_options: {name: 'upload-picture', type: direct}
queue_options: {name: 'upload-picture'}
callback: upload_picture_service
idle_timeout: 60
idle_timeout_exit_code: 0
timeout_wait: 10
```

#### Graceful max execution timeout ####

If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds.
Expand Down
107 changes: 65 additions & 42 deletions RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@

class Consumer extends BaseConsumer
{
const TIMEOUT_TYPE_IDLE = 'idle';
const TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION = 'graceful-max-execution';

/**
* @var int|null $memoryLimit
*/
Expand All @@ -32,6 +29,16 @@ class Consumer extends BaseConsumer
*/
protected $gracefulMaxExecutionTimeoutExitCode = 0;

/**
* @var int|null
*/
protected $timeoutWait;

/**
* @var \DateTime|null
*/
protected $lastActivityDateTime;

/**
* Set the memory limit
*
Expand Down Expand Up @@ -67,6 +74,7 @@ public function consume($msgAmount)

$this->setupConsumer();

$this->setLastActivityDateTime(new \DateTime());
while (count($this->getChannel()->callbacks)) {
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
$this->maybeStopConsumer();
Expand All @@ -76,29 +84,35 @@ public function consume($msgAmount)
* graceful max execution timeout is being used.
*/
$waitTimeout = $this->chooseWaitTimeout();
if (
$waitTimeout['timeoutType'] === self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION
&& $waitTimeout['seconds'] < 1
if ($this->gracefulMaxExecutionDateTime
&& $waitTimeout < 1
) {
return $this->gracefulMaxExecutionTimeoutExitCode;
}

if (!$this->forceStop) {
try {
$this->getChannel()->wait(null, false, $waitTimeout['seconds']);
$this->getChannel()->wait(null, false, $waitTimeout);
$this->setLastActivityDateTime(new \DateTime());
} catch (AMQPTimeoutException $e) {
if (self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION === $waitTimeout['timeoutType']) {
return $this->gracefulMaxExecutionTimeoutExitCode;
}
$now = time();

$idleEvent = new OnIdleEvent($this);
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);

if ($idleEvent->isForceStop()) {
if (null !== $this->getIdleTimeoutExitCode()) {
return $this->getIdleTimeoutExitCode();
} else {
throw $e;
if ($this->gracefulMaxExecutionDateTime
&& $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
) {
return $this->gracefulMaxExecutionTimeoutExitCode;
} elseif ($this->getIdleTimeout()
&& ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
) {
$idleEvent = new OnIdleEvent($this);
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);

if ($idleEvent->isForceStop()) {
if (null !== $this->getIdleTimeoutExitCode()) {
return $this->getIdleTimeoutExitCode();
} else {
throw $e;
}
}
}
}
Expand All @@ -115,7 +129,7 @@ public function purge()
{
$this->getChannel()->queue_purge($this->queueOptions['name'], true);
}

/**
* Delete the queue
*/
Expand Down Expand Up @@ -239,6 +253,11 @@ public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
$this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
}

public function setTimeoutWait(int $timeoutWait): void
{
$this->timeoutWait = $timeoutWait;
}

/**
* @return \DateTime|null
*/
Expand All @@ -255,20 +274,19 @@ public function getGracefulMaxExecutionTimeoutExitCode()
return $this->gracefulMaxExecutionTimeoutExitCode;
}

public function getTimeoutWait(): ?int
{
return $this->timeoutWait;
}

/**
* Choose the timeout to use for the $this->getChannel()->wait() method.
*
* @return array Of structure
* {
* timeoutType: string; // one of self::TIMEOUT_TYPE_*
* seconds: int;
* }
* Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
*/
private function chooseWaitTimeout()
private function chooseWaitTimeout(): int
{
if ($this->gracefulMaxExecutionDateTime) {
$allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
+ $allowedExecutionDateInterval->h * 3600
+ $allowedExecutionDateInterval->i * 60
+ $allowedExecutionDateInterval->s;
Expand All @@ -281,25 +299,30 @@ private function chooseWaitTimeout()
* Respect the idle timeout if it's set and if it's less than
* the remaining allowed execution.
*/
if (
$this->getIdleTimeout()
if ($this->getIdleTimeout()
&& $this->getIdleTimeout() < $allowedExecutionSeconds
) {
return array(
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
'seconds' => $this->getIdleTimeout(),
);
$waitTimeout = $this->getIdleTimeout();
} else {
$waitTimeout = $allowedExecutionSeconds;
}
} else {
$waitTimeout = $this->getIdleTimeout();
}

return array(
'timeoutType' => self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION,
'seconds' => $allowedExecutionSeconds,
);
if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
$waitTimeout = min($waitTimeout, $this->getTimeoutWait());
}
return $waitTimeout;
}

return array(
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
'seconds' => $this->getIdleTimeout(),
);
public function setLastActivityDateTime(\DateTime $dateTime)
{
$this->lastActivityDateTime = $dateTime;
}

protected function getLastActivityDateTime(): ?\DateTime
{
return $this->lastActivityDateTime;
}
}
4 changes: 3 additions & 1 deletion Tests/DependencyInjection/Fixtures/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ old_sound_rabbit_mq:
consumers:
foo_consumer:
connection: foo_connection
timeout_wait: 3
exchange_options:
name: foo_exchange
type: direct
Expand Down Expand Up @@ -137,6 +138,7 @@ old_sound_rabbit_mq:
multiple_consumers:
multi_test_consumer:
connection: foo_connection
timeout_wait: 3
exchange_options:
name: foo_multiple_exchange
type: direct
Expand All @@ -158,7 +160,7 @@ old_sound_rabbit_mq:
- 'iphone.upload'
callback: foo.multiple_test2.callback
queues_provider: foo.queues_provider

dynamic_consumers:
foo_dyn_consumer:
connection: foo_default
Expand Down
8 changes: 8 additions & 0 deletions Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ public function testFooConsumerDefinition()
array(
'setCallback',
array(array(new Reference('foo.callback'), 'execute'))
),
array(
'setTimeoutWait',
array(3)
)
),
$definition->getMethodCalls()
Expand Down Expand Up @@ -518,6 +522,10 @@ public function testMultipleConsumerDefinition()
array(
new Reference('foo.queues_provider')
)
),
array(
'setTimeoutWait',
array(3)
)
),
$definition->getMethodCalls()
Expand Down
Loading