Skip to content

Commit

Permalink
Implement basic.recover method
Browse files Browse the repository at this point in the history
  • Loading branch information
pinepain committed Feb 8, 2015
1 parent 43763d5 commit e361360
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ services:
- rabbitmq

install:
- sudo apt-get -qq install valgrind
- sudo apt-get update
- sudo apt-get -qq --fix-missing install valgrind

env:
global:
Expand Down
6 changes: 6 additions & 0 deletions amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getConnection, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_basicRecover, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_ARG_INFO(0, requeue)
ZEND_END_ARG_INFO()

/* amqp_queue_class ARG_INFO definition */
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class__construct, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, amqp_channel)
Expand Down Expand Up @@ -502,6 +506,8 @@ zend_function_entry amqp_channel_class_functions[] = {

PHP_ME(amqp_channel_class, getConnection, arginfo_amqp_channel_class_getConnection, ZEND_ACC_PUBLIC)

PHP_ME(amqp_channel_class, basicRecover, arginfo_amqp_channel_class_basicRecover, ZEND_ACC_PUBLIC)

{NULL, NULL, NULL} /* Must be the last line in amqp_functions[] */
};

Expand Down
50 changes: 50 additions & 0 deletions amqp_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,56 @@ PHP_METHOD(amqp_channel_class, getConnection)
}
/* }}} */

/* {{{ proto bool amqp::basicRecover([bool requeue=TRUE])
Redeliver unacknowledged messages */
PHP_METHOD(amqp_channel_class, basicRecover)
{
zval *id;
amqp_channel_object *channel;
amqp_connection_object *connection;

amqp_rpc_reply_t res;

zend_bool requeue = 1;

/* Get the vhost from the method params */
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|b", &id, amqp_channel_class_entry, &requeue) == FAILURE) {
return;
}

/* Get the channel object out of the store */
channel = (amqp_channel_object *)zend_object_store_get_object(id TSRMLS_CC);

connection = AMQP_GET_CONNECTION(channel);
AMQP_VERIFY_CONNECTION(connection, "Could not redeliver unacknowledged messages.");

amqp_basic_recover(
connection->connection_resource->connection_state,
channel->channel_id,
(amqp_boolean_t) requeue
);

res = amqp_get_rpc_reply(connection->connection_resource->connection_state);

if (res.reply_type != AMQP_RESPONSE_NORMAL) {
PHP_AMQP_INIT_ERROR_MESSAGE();

php_amqp_error(res, PHP_AMQP_ERROR_MESSAGE_PTR, connection, channel TSRMLS_CC);

php_amqp_zend_throw_exception(res, amqp_channel_exception_class_entry, PHP_AMQP_ERROR_MESSAGE, 0 TSRMLS_CC);
php_amqp_maybe_release_buffers_on_channel(connection, channel);

PHP_AMQP_DESTROY_ERROR_MESSAGE();
return;
}

php_amqp_maybe_release_buffers_on_channel(connection, channel);

RETURN_TRUE;

}
/* }}} */

/*
*Local variables:
*tab-width: 4
Expand Down
2 changes: 2 additions & 0 deletions amqp_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ PHP_METHOD(amqp_channel_class, rollbackTransaction);

PHP_METHOD(amqp_channel_class, getConnection);

PHP_METHOD(amqp_channel_class, basicRecover);

/*
*Local variables:
*tab-width: 4
Expand Down
3 changes: 3 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ http://pear.php.net/dtd/package-2.0.xsd">
</stability>
<license uri="http://www.php.net/license">PHP License</license>
<notes>
X.Y.Z Release:
* Add basic.recover AMQP method support (see AMQPChannel::basicRecover() method) (Bogdan Padalko)

1.6.0beta3 Release:
* Fix building on OS X (Bogdan Padalko)

Expand Down
9 changes: 9 additions & 0 deletions stubs/AMQPChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,13 @@ public function startTransaction()
public function getConnection()
{
}

/**
* Redeliver unacknowledged messages.
*
* @param bool $requeue
*/
public function basicRecover($requeue = true)
{
}
}
112 changes: 112 additions & 0 deletions tests/amqpchannel_basicRecover.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
--TEST--
AMQPChannel::basicRecover
--SKIPIF--
<?php
if (!extension_loaded("amqp") || version_compare(PHP_VERSION, '5.3', '<')) {
print "skip";
}
?>
--FILE--
<?php
$time = microtime(true);

$connection_1 = new AMQPConnection();
$connection_1->connect();

$channel_1 = new AMQPChannel($connection_1);
$channel_1->setPrefetchCount(5);

$exchange_1 = new AMQPExchange($channel_1);
$exchange_1->setType(AMQP_EX_TYPE_TOPIC);
$exchange_1->setName('test_' . $time);
$exchange_1->setFlags(AMQP_AUTODELETE);
$exchange_1->declareExchange();

$queue_1 = new AMQPQueue($channel_1);
$queue_1->setName('test_' . $time);
$queue_1->setFlags(AMQP_DURABLE);
$queue_1->declareQueue();

$queue_1->bind($exchange_1->getName(), 'test');

$messages_count = 0;
while ($messages_count++ < 10) {
$exchange_1->publish('test message #' . $messages_count, 'test');
//echo 'published test message #' . $messages_count, PHP_EOL;
}

$consume = 2; // NOTE: by default prefetch-count=3, so in consumer below we will ignore prefetched messages 3-5,
// and they will not seen by other consumers until we redeliver it.
$queue_1->consume(function(AMQPEnvelope $e, AMQPQueue $q) use (&$consume) {
echo 'consumed ', $e->getBody(), ' ', ($e->isRedelivery() ? '(redelivered)' : '(original)'), PHP_EOL;
$q->ack($e->getDeliveryTag());

return (-- $consume > 0);
});
$queue_1->cancel(); // we have to do that to prevent redelivering to the same consumer

$connection_2 = new AMQPConnection();
$connection_2->setReadTimeout(1);

$connection_2->connect();
$channel_2 = new AMQPChannel($connection_2);
$channel_2->setPrefetchCount(8);


$queue_2 = new AMQPQueue($channel_2);
$queue_2->setName('test_' . $time);

$consume = 10;
try {

$queue_2->consume(function (AMQPEnvelope $e, AMQPQueue $q) use (&$consume) {
echo 'consumed ' . $e->getBody(), ' ', ($e->isRedelivery() ? '(redelivered)' : '(original)'), PHP_EOL;
$q->ack($e->getDeliveryTag());

return (--$consume > 0);
});

} catch (AMQPException $e) {
echo get_class($e), ': ', $e->getMessage(), PHP_EOL;
}
$queue_2->cancel();
//var_dump($connection_2, $channel_2);die;


// yes, we do it repeatedly, basic.recover works in a slightly different way than it looks like. As it said,
// it "asks the server to redeliver all unacknowledged messages on a specified channel.
// ZERO OR MORE messages MAY BE redelivered"
$channel_1->basicRecover();

echo 'redelivered', PHP_EOL;

$consume = 10;
try {

$queue_2->consume(function (AMQPEnvelope $e, AMQPQueue $q) use (&$consume) {
echo 'consumed ' . $e->getBody(), ' ', ($e->isRedelivery() ? '(redelivered)' : '(original)'), PHP_EOL;
$q->ack($e->getDeliveryTag());

return (--$consume > 0);
});

} catch (AMQPException $e) {
echo get_class($e), ': ', $e->getMessage(), PHP_EOL;
}


?>
--EXPECT--
consumed test message #1 (original)
consumed test message #2 (original)
consumed test message #8 (original)
consumed test message #9 (original)
consumed test message #10 (original)
AMQPQueueException: Consumer timeout exceed
redelivered
consumed test message #3 (redelivered)
consumed test message #4 (redelivered)
consumed test message #5 (redelivered)
consumed test message #6 (redelivered)
consumed test message #7 (redelivered)
AMQPQueueException: Consumer timeout exceed

0 comments on commit e361360

Please sign in to comment.