Skip to content

Commit

Permalink
introducing timeout parameter for AMQPConnection class
Browse files Browse the repository at this point in the history
  • Loading branch information
narkq committed Nov 4, 2012
1 parent 21a6fba commit 94dc95f
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 3 deletions.
13 changes: 13 additions & 0 deletions amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_connection_class_setVhost, ZEND_SEND_BY_VAL,
ZEND_ARG_INFO(0, vhost)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_connection_class_getTimeout, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_connection_class_setTimeout, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, timeout)
ZEND_END_ARG_INFO()

/* amqp_channel_class ARG_INFO definition */
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_channel_class__construct, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, amqp_connection)
Expand Down Expand Up @@ -399,6 +406,9 @@ zend_function_entry amqp_connection_class_functions[] = {
PHP_ME(amqp_connection_class, getVhost, arginfo_amqp_connection_class_getVhost, ZEND_ACC_PUBLIC)
PHP_ME(amqp_connection_class, setVhost, arginfo_amqp_connection_class_setVhost, ZEND_ACC_PUBLIC)

PHP_ME(amqp_connection_class, getTimeout, arginfo_amqp_connection_class_getTimeout, ZEND_ACC_PUBLIC)
PHP_ME(amqp_connection_class, setTimeout, arginfo_amqp_connection_class_setTimeout, ZEND_ACC_PUBLIC)

{NULL, NULL, NULL} /* Must be the last line in amqp_functions[] */
};

Expand Down Expand Up @@ -677,6 +687,7 @@ PHP_INI_BEGIN()
PHP_INI_ENTRY("amqp.host", DEFAULT_HOST, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.vhost", DEFAULT_VHOST, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.port", DEFAULT_PORT, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.timeout", DEFAULT_TIMEOUT, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.login", DEFAULT_LOGIN, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.password", DEFAULT_PASSWORD, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.auto_ack", DEFAULT_AUTOACK, PHP_INI_ALL, NULL)
Expand Down Expand Up @@ -751,6 +762,8 @@ PHP_MINIT_FUNCTION(amqp)
REGISTER_STRING_CONSTANT("AMQP_EX_TYPE_TOPIC", AMQP_EX_TYPE_TOPIC, CONST_CS | CONST_PERSISTENT);
REGISTER_STRING_CONSTANT("AMQP_EX_TYPE_HEADERS",AMQP_EX_TYPE_HEADERS, CONST_CS | CONST_PERSISTENT);

REGISTER_LONG_CONSTANT("AMQP_OS_SOCKET_TIMEOUT_ERRNO", AMQP_OS_SOCKET_TIMEOUT_ERRNO, CONST_CS | CONST_PERSISTENT);

return SUCCESS;

}
Expand Down
102 changes: 101 additions & 1 deletion amqp_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ HashTable *amqp_connection_object_get_debug_info(zval *object, int *is_temp TSRM
ZVAL_LONG(value, connection->port);
zend_hash_add(debug_info, "port", sizeof("port"), &value, sizeof(zval *), NULL);

MAKE_STD_ZVAL(value);
ZVAL_DOUBLE(value, connection->timeout);
zend_hash_add(debug_info, "timeout", sizeof("timeout"), &value, sizeof(zval *), NULL);

/* Start adding values */
return debug_info;
}
Expand Down Expand Up @@ -166,6 +170,8 @@ int php_amqp_connect(amqp_connection_object *connection, int persistent TSRMLS_D

amqp_set_sockfd(connection->connection_resource->connection_state, connection->connection_resource->fd);

php_amqp_set_timeout(connection TSRMLS_CC);

x = amqp_login(
connection->connection_resource->connection_state,
connection->vhost,
Expand Down Expand Up @@ -253,6 +259,35 @@ void php_amqp_disconnect(amqp_connection_object *connection)
return;
}

int php_amqp_set_timeout(amqp_connection_object *connection TSRMLS_DC)
{
#ifdef PHP_WIN32
DWORD timeout;
/*
In Windows, setsockopt with SO_RCVTIMEO sets actual timeout
to a value that's 500ms greater than specified value.
Also, it's not possible to set timeout to any value below 500ms.
Zero timeout works like it should, however.
*/
if (connection->timeout == 0.) {
timeout = 0;
} else {
timeout = (int) (max(connection->timeout * 1.e+3 - .5e+3, 1.));
}
#else
struct timeval timeout;
timeout.tv_sec = (int) floor(connection->timeout);
timeout.tv_usec = (int) ((connection->timeout - floor(connection->timeout)) * 1.e+6);
#endif

if (0 != setsockopt(connection->connection_resource->fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout))) {
zend_throw_exception(amqp_connection_exception_class_entry, "Socket error: cannot setsockopt SO_RCVTIMEO", 0 TSRMLS_CC);
return 0;
}

return 1;
}

int get_next_available_channel(amqp_connection_object *connection, amqp_channel_object *channel)
{
int slot;
Expand Down Expand Up @@ -395,7 +430,7 @@ zend_object_value amqp_connection_ctor(zend_class_entry *ce TSRMLS_DC)


/* {{{ proto AMQPConnection::__construct([array optional])
* The array can contain 'host', 'port', 'login', 'password', 'vhost' indexes
* The array can contain 'host', 'port', 'login', 'password', 'vhost', 'timeout' indexes
*/
PHP_METHOD(amqp_connection_class, __construct)
{
Expand Down Expand Up @@ -488,6 +523,17 @@ PHP_METHOD(amqp_connection_class, __construct)
convert_to_long(*zdata);
connection->port = (size_t)Z_LVAL_PP(zdata);
}

connection->timeout = INI_FLT("amqp.timeout");

if (iniArr && SUCCESS == zend_hash_find(HASH_OF (iniArr), "timeout", sizeof("timeout"), (void*)&zdata)) {
convert_to_double(*zdata);
if (Z_DVAL_PP(zdata) < 0) {
zend_throw_exception(amqp_connection_exception_class_entry, "Parameter 'timeout' must be greater than or equal to zero.", 0 TSRMLS_CC);
} else {
connection->timeout = Z_DVAL_PP(zdata);
}
}
}
/* }}} */

Expand Down Expand Up @@ -963,6 +1009,60 @@ PHP_METHOD(amqp_connection_class, setVhost)
}
/* }}} */

/* {{{ proto amqp::getTimeout()
get the timeout */
PHP_METHOD(amqp_connection_class, getTimeout)
{
zval *id;
amqp_connection_object *connection;

/* Get the timeout from the method params */
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", &id, amqp_connection_class_entry) == FAILURE) {
return;
}

/* Get the connection object out of the store */
connection = (amqp_connection_object *)zend_object_store_get_object(id TSRMLS_CC);

/* Copy the timeout to the amqp object */
RETURN_DOUBLE(connection->timeout);
}
/* }}} */

/* {{{ proto amqp::setTimeout(double timeout)
set the timeout */
PHP_METHOD(amqp_connection_class, setTimeout)
{
zval *id;
amqp_connection_object *connection;
double timeout;

/* Get the timeout from the method params */
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Od", &id, amqp_connection_class_entry, &timeout) == FAILURE) {
return;
}

/* Validate timeout */
if (timeout < 0) {
zend_throw_exception(amqp_connection_exception_class_entry, "Parameter 'timeout' must be greater than or equal to zero.", 0 TSRMLS_CC);
return;
}

/* Get the connection object out of the store */
connection = (amqp_connection_object *)zend_object_store_get_object(id TSRMLS_CC);

/* Copy the timeout to the amqp object */
connection->timeout = timeout;

if (connection->is_connected == '\1') {
if (php_amqp_set_timeout(connection TSRMLS_CC) == 0) {
RETURN_FALSE;
}
}

RETURN_TRUE;
}
/* }}} */

/*
*Local variables:
Expand Down
2 changes: 2 additions & 0 deletions amqp_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ PHP_METHOD(amqp_connection_class, setPort);
PHP_METHOD(amqp_connection_class, getVhost);
PHP_METHOD(amqp_connection_class, setVhost);

PHP_METHOD(amqp_connection_class, getTimeout);
PHP_METHOD(amqp_connection_class, setTimeout);

/*
*Local variables:
Expand Down
3 changes: 3 additions & 0 deletions amqp_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ int read_message_from_channel(amqp_connection_state_t connection, zval *envelope

/* Check that the basic read from frame did not fail */
if (result < 0) {
zend_throw_exception(amqp_connection_exception_class_entry, amqp_error_string(-result), AQMP_GET_REAL_OS_ERRNO(-result) TSRMLS_CC);
return AMQP_READ_ERROR;
}

Expand Down Expand Up @@ -233,6 +234,7 @@ int read_message_from_channel(amqp_connection_state_t connection, zval *envelope
/* Read in the next frame */
result = amqp_simple_wait_frame(connection, &frame);
if (result < 0) {
zend_throw_exception(amqp_connection_exception_class_entry, amqp_error_string(-result), AQMP_GET_REAL_OS_ERRNO(-result) TSRMLS_CC);
return AMQP_READ_ERROR;
}

Expand Down Expand Up @@ -395,6 +397,7 @@ int read_message_from_channel(amqp_connection_state_t connection, zval *envelope
/* Read in the next frame */
result = amqp_simple_wait_frame(connection, &frame);
if (result < 0) {
zend_throw_exception(amqp_connection_exception_class_entry, amqp_error_string(-result), AQMP_GET_REAL_OS_ERRNO(-result) TSRMLS_CC);
return AMQP_READ_ERROR;
}

Expand Down
10 changes: 10 additions & 0 deletions php_amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ extern zend_class_entry *amqp_exception_class_entry,

#define DEFAULT_PORT "5672" /* default AMQP port */
#define DEFAULT_HOST "localhost"
#define DEFAULT_TIMEOUT "0"
#define DEFAULT_VHOST "/"
#define DEFAULT_LOGIN "guest"
#define DEFAULT_PASSWORD "guest"
Expand Down Expand Up @@ -287,6 +288,8 @@ extern zend_class_entry *amqp_exception_class_entry,
} while (0);
#endif

#define AMQP_ERROR_CATEGORY_MASK (1 << 29)
#define AQMP_GET_REAL_OS_ERRNO(err) (err) & ~AMQP_ERROR_CATEGORY_MASK

extern int le_amqp_connection_resource;
// ZEND_DECLARE_MODULE_GLOBALS(amqp)
Expand Down Expand Up @@ -320,6 +323,7 @@ typedef struct _amqp_connection_object {
char *vhost;
int vhost_len;
int port;
double timeout;
amqp_connection_resource *connection_resource;
} amqp_connection_object;

Expand Down Expand Up @@ -389,6 +393,12 @@ typedef struct _amqp_envelope_object {
# define AMQP_CLOSE_SOCKET(fd) close(fd);
#endif

#ifdef PHP_WIN32
# define AMQP_OS_SOCKET_TIMEOUT_ERRNO WSAETIMEDOUT
#else
# define AMQP_OS_SOCKET_TIMEOUT_ERRNO EAGAIN
#endif


#ifdef ZTS
#define AMQP_G(v) TSRMG(amqp_globals_id, zend_amqp_globals *, v)
Expand Down
5 changes: 3 additions & 2 deletions tests/amqpconnection_var_dump.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ $cnn = new AMQPConnection();
var_dump($cnn);
?>
--EXPECT--
object(AMQPConnection)#1 (5) {
object(AMQPConnection)#1 (6) {
["login"]=>
string(5) "guest"
["password"]=>
Expand All @@ -19,5 +19,6 @@ object(AMQPConnection)#1 (5) {
string(1) "/"
["port"]=>
int(5672)
["timeout"]=>
float(0)
}

0 comments on commit 94dc95f

Please sign in to comment.