Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
->children()
Expand Down Expand Up @@ -193,6 +194,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
Expand Down Expand Up @@ -233,6 +235,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('qos_options')
->canBeUnset()
Expand Down
9 changes: 9 additions & 0 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ protected function loadConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['timeout_wait'])) {
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
}
if (isset($consumer['graceful_max_execution'])) {
$definition->addMethodCall(
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
Expand Down Expand Up @@ -290,6 +293,9 @@ protected function loadMultipleConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['timeout_wait'])) {
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
}
if (isset($consumer['graceful_max_execution'])) {
$definition->addMethodCall(
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
Expand Down Expand Up @@ -363,6 +369,9 @@ protected function loadDynamicConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['timeout_wait'])) {
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,23 @@ consumers:
idle_timeout_exit_code: 0
```

#### Timeout wait ####

Set the `timeout_wait` in seconds.
The `timeout_wait` specifies how long the consumer will wait without receiving a new message before ensuring the current connection is still valid.

```yaml
consumers:
upload_picture:
connection: default
exchange_options: {name: 'upload-picture', type: direct}
queue_options: {name: 'upload-picture'}
callback: upload_picture_service
idle_timeout: 60
idle_timeout_exit_code: 0
timeout_wait: 10
```

#### Graceful max execution timeout ####

If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds.
Expand Down
115 changes: 76 additions & 39 deletions RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@

class Consumer extends BaseConsumer
{
const TIMEOUT_TYPE_IDLE = 'idle';
const TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION = 'graceful-max-execution';

/**
* @var int $memoryLimit
*/
Expand All @@ -32,6 +29,16 @@ class Consumer extends BaseConsumer
*/
protected $gracefulMaxExecutionTimeoutExitCode = 0;

/**
* @var int|null
*/
protected $timeoutWait;

/**
* @var \DateTime|null
*/
protected $lastActivityDateTime;

/**
* Set the memory limit
*
Expand Down Expand Up @@ -67,6 +74,7 @@ public function consume($msgAmount)

$this->setupConsumer();

$this->setLastActivityDateTime(new \DateTime());
while (count($this->getChannel()->callbacks)) {
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
$this->maybeStopConsumer();
Expand All @@ -76,29 +84,35 @@ public function consume($msgAmount)
* graceful max execution timeout is being used.
*/
$waitTimeout = $this->chooseWaitTimeout();
if (
$waitTimeout['timeoutType'] === self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION
&& $waitTimeout['seconds'] < 1
if ($this->gracefulMaxExecutionDateTime
&& $waitTimeout < 1
) {
return $this->gracefulMaxExecutionTimeoutExitCode;
}

if (!$this->forceStop) {
try {
$this->getChannel()->wait(null, false, $waitTimeout['seconds']);
$this->getChannel()->wait(null, false, $waitTimeout);
$this->setLastActivityDateTime(new \DateTime());
} catch (AMQPTimeoutException $e) {
if (self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION === $waitTimeout['timeoutType']) {
return $this->gracefulMaxExecutionTimeoutExitCode;
}

$idleEvent = new OnIdleEvent($this);
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
$now = time();

if ($idleEvent->isForceStop()) {
if (null !== $this->getIdleTimeoutExitCode()) {
return $this->getIdleTimeoutExitCode();
} else {
throw $e;
if ($this->gracefulMaxExecutionDateTime
&& $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use timestamps everywhere? I mean there is no point to use \DateTime here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed that timestamps would be preferred. I introduced $lastActivityDateTime as \DateTime to match $gracefulMaxExecutionDateTime which was already in place.

) {
return $this->gracefulMaxExecutionTimeoutExitCode;
} elseif ($this->getIdleTimeout()
&& ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
) {
$idleEvent = new OnIdleEvent($this);
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);

if ($idleEvent->isForceStop()) {
if (null !== $this->getIdleTimeoutExitCode()) {
return $this->getIdleTimeoutExitCode();
} else {
throw $e;
}
}
}
}
Expand All @@ -115,7 +129,7 @@ public function purge()
{
$this->getChannel()->queue_purge($this->queueOptions['name'], true);
}

/**
* Delete the queue
*/
Expand Down Expand Up @@ -239,6 +253,14 @@ public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
$this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
}

/**
* @param int $timeoutWait
*/
public function setTimeoutWait($timeoutWait)
{
$this->timeoutWait = $timeoutWait;
}

/**
* @return \DateTime|null
*/
Expand All @@ -256,13 +278,17 @@ public function getGracefulMaxExecutionTimeoutExitCode()
}

/**
* Choose the timeout to use for the $this->getChannel()->wait() method.
* @return int
*/
public function getTimeoutWait()
{
return $this->timeoutWait;
}

/**
* Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
*
* @return array Of structure
* {
* timeoutType: string; // one of self::TIMEOUT_TYPE_*
* seconds: int;
* }
* @return int
*/
private function chooseWaitTimeout()
{
Expand All @@ -281,25 +307,36 @@ private function chooseWaitTimeout()
* Respect the idle timeout if it's set and if it's less than
* the remaining allowed execution.
*/
if (
$this->getIdleTimeout()
if ($this->getIdleTimeout()
&& $this->getIdleTimeout() < $allowedExecutionSeconds
) {
return array(
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
'seconds' => $this->getIdleTimeout(),
);
$waitTimeout = $this->getIdleTimeout();
} else {
$waitTimeout = $allowedExecutionSeconds;
}
} else {
$waitTimeout = $this->getIdleTimeout();
}

return array(
'timeoutType' => self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION,
'seconds' => $allowedExecutionSeconds,
);
if ($this->getTimeoutWait()) {
$waitTimeout = min($waitTimeout, $this->getTimeoutWait());
}
return $waitTimeout;
}

return array(
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
'seconds' => $this->getIdleTimeout(),
);
/**
* @param \DateTime $dateTime
*/
public function setLastActivityDateTime(\DateTime $dateTime)
{
$this->lastActivityDateTime = $dateTime;
}

/**
* @return \DateTime|null
*/
protected function getLastActivityDateTime()
{
return $this->lastActivityDateTime;
}
}
2 changes: 2 additions & 0 deletions Tests/DependencyInjection/Fixtures/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ old_sound_rabbit_mq:
consumers:
foo_consumer:
connection: foo_connection
timeout_wait: 3
exchange_options:
name: foo_exchange
type: direct
Expand Down Expand Up @@ -134,6 +135,7 @@ old_sound_rabbit_mq:
multiple_consumers:
multi_test_consumer:
connection: foo_connection
timeout_wait: 3
exchange_options:
name: foo_multiple_exchange
type: direct
Expand Down
8 changes: 8 additions & 0 deletions Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ public function testFooConsumerDefinition()
array(
'setCallback',
array(array(new Reference('foo.callback'), 'execute'))
),
array(
'setTimeoutWait',
array(3)
)
),
$definition->getMethodCalls()
Expand Down Expand Up @@ -518,6 +522,10 @@ public function testMultipleConsumerDefinition()
array(
new Reference('foo.queues_provider')
)
),
array(
'setTimeoutWait',
array(3)
)
),
$definition->getMethodCalls()
Expand Down
Loading