Skip to content

Commit

Permalink
Fix for create channel race condition by marking slots as used (fixes p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lstrojny committed May 14, 2013
1 parent fd32c5b commit b00b676
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 11 deletions.
10 changes: 8 additions & 2 deletions amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,21 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_isConnected, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getChannelId, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, size)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getPrefetchSize, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_setPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, count)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_getPrefetchCount, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class_qos, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 2)
Expand Down Expand Up @@ -445,6 +448,9 @@ zend_function_entry amqp_connection_class_functions[] = {
zend_function_entry amqp_channel_class_functions[] = {
PHP_ME(amqp_channel_class, __construct, arginfo_amqp_channel_class__construct, ZEND_ACC_PUBLIC)
PHP_ME(amqp_channel_class, isConnected, arginfo_amqp_channel_class_isConnected, ZEND_ACC_PUBLIC)

PHP_ME(amqp_channel_class, getChannelId, arginfo_amqp_channel_class_getChannelId, ZEND_ACC_PUBLIC)

PHP_ME(amqp_channel_class, setPrefetchSize, arginfo_amqp_channel_class_setPrefetchSize, ZEND_ACC_PUBLIC)
PHP_ME(amqp_channel_class, getPrefetchSize, arginfo_amqp_channel_class_getPrefetchSize, ZEND_ACC_PUBLIC)
PHP_ME(amqp_channel_class, setPrefetchCount,arginfo_amqp_channel_class_setPrefetchCount,ZEND_ACC_PUBLIC)
Expand Down
18 changes: 18 additions & 0 deletions amqp_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ PHP_METHOD(amqp_channel_class, isConnected)
}
/* }}} */

/* {{{ proto bool amqp::getChannelId()
get amqp channel ID */
PHP_METHOD(amqp_channel_class, getChannelId)
{
zval *id;
amqp_channel_object *channel;

/* Try to pull amqp object out of method params */
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", &id, amqp_channel_class_entry) == FAILURE) {
return;
}

/* Get the channel object out of the store */
channel = (amqp_channel_object *)zend_object_store_get_object(id TSRMLS_CC);

RETURN_LONG(channel->channel_id);
}
/* }}} */

/* {{{ proto bool amqp::setPrefetchCount(long count)
set the number of prefetches */
Expand Down
1 change: 1 addition & 0 deletions amqp_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ zend_object_value amqp_channel_ctor(zend_class_entry *ce TSRMLS_DC);

PHP_METHOD(amqp_channel_class, __construct);
PHP_METHOD(amqp_channel_class, isConnected);
PHP_METHOD(amqp_channel_class, getChannelId);
PHP_METHOD(amqp_channel_class, setPrefetchSize);
PHP_METHOD(amqp_channel_class, getPrefetchSize);
PHP_METHOD(amqp_channel_class, setPrefetchCount);
Expand Down
24 changes: 15 additions & 9 deletions amqp_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ int php_amqp_connect(amqp_connection_object *connection, int persistent TSRMLS_D
int slot;
for (slot = 1; slot < DEFAULT_CHANNELS_PER_CONNECTION; slot++) {
if (connection->connection_resource->slots[slot] != 0) {
/* We found the channel, disconnect it: */
amqp_channel_close(connection->connection_resource->connection_state, slot, AMQP_REPLY_SUCCESS);

if ((long) connection->connection_resource->slots[slot] != -1) {
/* We found the channel, disconnect it: */
amqp_channel_close(connection->connection_resource->connection_state, slot, AMQP_REPLY_SUCCESS);
}

/* Clean up our local storage */
connection->connection_resource->slots[slot] = 0;
connection->connection_resource->used_slots--;
Expand Down Expand Up @@ -237,8 +239,10 @@ void php_amqp_disconnect(amqp_connection_object *connection)
/* Close all open channels */
for (slot = 1; slot < DEFAULT_CHANNELS_PER_CONNECTION; slot++) {
if (resource->slots[slot] != 0) {
/* We found the channel, disconnect it: */
amqp_channel_close(connection->connection_resource->connection_state, slot, AMQP_REPLY_SUCCESS);
if ((long) resource->slots[slot] != -1) {
/* We found the channel, disconnect it: */
amqp_channel_close(connection->connection_resource->connection_state, slot, AMQP_REPLY_SUCCESS);
}

/* Clean up our local storage */
resource->slots[slot] = 0;
Expand Down Expand Up @@ -366,9 +370,8 @@ void remove_channel_from_connection(amqp_connection_object *connection, amqp_cha
/* We found the channel, disconnect it: */
amqp_channel_close(connection->connection_resource->connection_state, channel->channel_id, AMQP_REPLY_SUCCESS);

/* Clean up our local storage */
resource->slots[slot] = 0;
resource->used_slots--;
/* Mark slot as used */
resource->slots[slot] = (amqp_channel_object *) -1;

return;
}
Expand Down Expand Up @@ -417,7 +420,10 @@ void amqp_connection_dtor(void *object TSRMLS_DC)
if (!connection->connection_resource->slots[slot]) {
continue;
}
amqp_channel_close(connection->connection_resource->connection_state, connection->connection_resource->slots[slot]->channel_id, AMQP_REPLY_SUCCESS);
/* Close channel if it has not been closed and marked as used */
if ((long) connection->connection_resource->slots[slot] != -1) {
amqp_channel_close(connection->connection_resource->connection_state, connection->connection_resource->slots[slot]->channel_id, AMQP_REPLY_SUCCESS);
}
/* Clean up our local storage */
connection->connection_resource->slots[slot] = 0;
connection->connection_resource->used_slots--;
Expand Down
9 changes: 9 additions & 0 deletions stubs/AMQPChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public function isConnected ()
{
}

/**
* Return internal channel ID
*
* @return integer
*/
public function getChannelId ()
{
}

/**
* Set the Quality Of Service settings for the given channel.
*
Expand Down
27 changes: 27 additions & 0 deletions tests/bug_gh50-1.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
--TEST--
Channel creation race condition (https://github.com/pdezwart/php-amqp/issues/50) (1)
--SKIPIF--
<?php if (!extension_loaded("amqp")) print "skip"; ?>
--FILE--
<?php
$connection = new AMQPConnection();
$connection->connect();

for ($i = 0; $i < 3; $i++) {

$channel = new AMQPChannel($connection);
var_dump($channel->getChannelId());

$queue = new AMQPQueue($channel);
$queue->setName('test' . $i);

$queue->declare();
$queue->delete();
}
?>
==DONE==
--EXPECT--
int(1)
int(2)
int(3)
==DONE==
30 changes: 30 additions & 0 deletions tests/bug_gh50-2.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
--TEST--
Channel creation race condition (https://github.com/pdezwart/php-amqp/issues/50) (2)
--SKIPIF--
<?php if (!extension_loaded("amqp")) print "skip"; ?>
--FILE--
<?php
$connection = new AMQPConnection();
$connection->connect();

for ($i = 0; $i < 3; $i++) {

$channel = new AMQPChannel($connection);
var_dump($channel->getChannelId());

$queue = new AMQPQueue($channel);
$queue->setName('test' . $i);

$queue->declare();
$queue->delete();

unset($queue);
unset($channel);
}
?>
==DONE==
--EXPECT--
int(1)
int(2)
int(3)
==DONE==
47 changes: 47 additions & 0 deletions tests/bug_gh50-3.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
--TEST--
Channel creation race condition (https://github.com/pdezwart/php-amqp/issues/50) (3)
--SKIPIF--
<?php if (!extension_loaded("amqp")) print "skip"; ?>
--FILE--
<?php
$connection = new AMQPConnection();
$connection->connect();

for ($i = 0; $i < 3; $i++) {

$channel = new AMQPChannel($connection);
var_dump($channel->getChannelId());

$queue = new AMQPQueue($channel);
$queue->setName('test' . $i);

$queue->declare();
$queue->delete();
}

$connection = new AMQPConnection();
$connection->connect();

for ($i = 0; $i < 3; $i++) {

$channel = new AMQPChannel($connection);
var_dump($channel->getChannelId());

$queue = new AMQPQueue($channel);
$queue->setName('test' . $i);

$queue->declare();
$queue->delete();
}


?>
==DONE==
--EXPECT--
int(1)
int(2)
int(3)
int(1)
int(2)
int(3)
==DONE==
49 changes: 49 additions & 0 deletions tests/bug_gh50-4.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
--TEST--
Channel creation race condition (https://github.com/pdezwart/php-amqp/issues/50) (4)
--SKIPIF--
<?php if (!extension_loaded("amqp")) print "skip"; ?>
--FILE--
<?php
$connection = new AMQPConnection();
$connection->connect();

$channels = array();

for ($i = 0; $i < 3; $i++) {

$channel = $channels[] = new AMQPChannel($connection);
var_dump($channel->getChannelId());

$queue = new AMQPQueue($channel);
$queue->setName('test' . $i);

$queue->declare();
$queue->delete();
}

$connection = new AMQPConnection();
$connection->connect();

for ($i = 0; $i < 3; $i++) {

$channel = $channels[] = new AMQPChannel($connection);
var_dump($channel->getChannelId());

$queue = new AMQPQueue($channel);
$queue->setName('test' . $i);

$queue->declare();
$queue->delete();
}


?>
==DONE==
--EXPECT--
int(1)
int(2)
int(3)
int(1)
int(2)
int(3)
==DONE==

0 comments on commit b00b676

Please sign in to comment.