Skip to content

Commit

Permalink
Add support for arguments nested values, closes php-amqp#124
Browse files Browse the repository at this point in the history
  • Loading branch information
pinepain committed Dec 19, 2014
1 parent abe068d commit 030ebbc
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 191 deletions.
103 changes: 87 additions & 16 deletions amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,20 +729,16 @@ char *stringify_bytes(amqp_bytes_t bytes)
return res;
}



amqp_table_t *convert_zval_to_arguments(zval *zvalArguments)
void internal_convert_zval_to_amqp_table(zval *zvalArguments, amqp_table_t *arguments, char allow_int_keys TSRMLS_DC)
{
HashTable *argumentHash;
HashPosition pos;
zval **data;
amqp_table_t *arguments;
char type[16];
amqp_table_t *inner_table;

argumentHash = Z_ARRVAL_P(zvalArguments);

/* In setArguments, we are overwriting all the existing values */
arguments = (amqp_table_t *)emalloc(sizeof(amqp_table_t));

/* Allocate all the memory necessary for storing the arguments */
arguments->entries = (amqp_table_entry_t *)ecalloc(zend_hash_num_elements(argumentHash), sizeof(amqp_table_entry_t));
arguments->num_entries = 0;
Expand All @@ -768,45 +764,120 @@ amqp_table_t *convert_zval_to_arguments(zval *zvalArguments)
/* Now pull the key */

if (zend_hash_get_current_key_ex(argumentHash, &key, &key_len, &index, 0, &pos) != HASH_KEY_IS_STRING) {
/* Skip things that are not strings */
continue;

if (allow_int_keys) {
/* Convert to strings non-string keys */
char str[32];

key_len = sprintf(str, "%lu", index);
key = str;
} else {
/* Skip things that are not strings */
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Ignoring non-string header field '%lu'", index);

continue;
}

}

/* Build the value */
table = &arguments->entries[arguments->num_entries++];
field = &table->value;
strKey = estrndup(key, key_len);
table->key = amqp_cstring_bytes(strKey);

switch (Z_TYPE_P(&value)) {
case IS_BOOL:
field->kind = AMQP_FIELD_KIND_BOOLEAN;
field->kind = AMQP_FIELD_KIND_BOOLEAN;
field->value.boolean = (amqp_boolean_t)Z_LVAL_P(&value);
break;
case IS_DOUBLE:
field->kind = AMQP_FIELD_KIND_F64;
field->kind = AMQP_FIELD_KIND_F64;
field->value.f64 = Z_DVAL_P(&value);
break;
case IS_LONG:
field->kind = AMQP_FIELD_KIND_I64;
field->kind = AMQP_FIELD_KIND_I64;
field->value.i64 = Z_LVAL_P(&value);
break;
case IS_STRING:
field->kind = AMQP_FIELD_KIND_UTF8;
strValue = estrndup(Z_STRVAL_P(&value), Z_STRLEN_P(&value));
field->kind = AMQP_FIELD_KIND_UTF8;
strValue = estrndup(Z_STRVAL_P(&value), Z_STRLEN_P(&value));
field->value.bytes = amqp_cstring_bytes(strValue);
break;
case IS_ARRAY:
field->kind = AMQP_FIELD_KIND_TABLE;
internal_convert_zval_to_amqp_table(&value, &field->value.table, 1 TSRMLS_CC);

break;
default:
switch(Z_TYPE_P(&value)) {
case IS_NULL: strcpy(type, "null"); break;
case IS_OBJECT: strcpy(type, "object"); break;
case IS_RESOURCE: strcpy(type, "resource"); break;
default: strcpy(type, "unknown");
}

php_error_docref(NULL TSRMLS_CC, E_WARNING, "Ignoring field '%s' due to unsupported value type (%s)", key, type);

/* Reset entries counter back */
arguments->num_entries --;
continue;
}

strKey = estrndup(key, key_len);
table->key = amqp_cstring_bytes(strKey);

/* Clean up the zval */
zval_dtor(&value);
}
}

inline amqp_table_t *convert_zval_to_amqp_table(zval *zvalArguments TSRMLS_DC)
{
amqp_table_t *arguments;
/* In setArguments, we are overwriting all the existing values */
arguments = (amqp_table_t *)emalloc(sizeof(amqp_table_t));

internal_convert_zval_to_amqp_table(zvalArguments, arguments, 0 TSRMLS_CC);

return arguments;
}




void internal_php_amqp_free_amqp_table(amqp_table_t *object, char clear_root)
{
if (!object) {
return;
}

if ((object)->entries) {
int macroEntryCounter;
for (macroEntryCounter = 0; macroEntryCounter < (object)->num_entries; macroEntryCounter++) {
efree((object)->entries[macroEntryCounter].key.bytes);

switch ((object)->entries[macroEntryCounter].value.kind) {
case AMQP_FIELD_KIND_TABLE:
internal_php_amqp_free_amqp_table(&(object)->entries[macroEntryCounter].value.value.table, 0);
break;
case AMQP_FIELD_KIND_UTF8:
efree((object)->entries[macroEntryCounter].value.value.bytes.bytes);
break;
}
}
efree((object)->entries);
}

if (clear_root) {
efree(object);
}
}

void php_amqp_free_amqp_table(amqp_table_t *object)
{
internal_php_amqp_free_amqp_table(object, 1);
}


PHP_INI_BEGIN()
PHP_INI_ENTRY("amqp.host", DEFAULT_HOST, PHP_INI_ALL, NULL)
PHP_INI_ENTRY("amqp.vhost", DEFAULT_VHOST, PHP_INI_ALL, NULL)
Expand Down
130 changes: 14 additions & 116 deletions amqp_exchange.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,6 @@ zend_object_value amqp_exchange_ctor(zend_class_entry *ce TSRMLS_DC)
return new_value;
}

void free_field_value(struct amqp_field_value_t_ value) {
switch (value.kind) {
case AMQP_FIELD_KIND_ARRAY:
{
int i;
for (i=0; i<value.value.array.num_entries; ++i) {
free_field_value(value.value.array.entries[i]);
}
efree(value.value.array.entries);
}
break;
case AMQP_FIELD_KIND_TABLE:
{
int i;
for (i=0; i<value.value.table.num_entries; ++i) {
free_field_value(value.value.table.entries[i].value);
}
efree(value.value.table.entries);
}
break;
}
}


/* {{{ proto AMQPExchange::__construct(AMQPChannel channel);
create Exchange */
PHP_METHOD(amqp_exchange_class, __construct)
Expand Down Expand Up @@ -494,7 +470,7 @@ PHP_METHOD(amqp_exchange_class, declareExchange)
return;
}

arguments = convert_zval_to_arguments(exchange->arguments);
arguments = convert_zval_to_amqp_table(exchange->arguments TSRMLS_CC);

#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR >= 5 && AMQP_VERSION_PATCH > 2
amqp_exchange_declare(
Expand Down Expand Up @@ -522,7 +498,7 @@ PHP_METHOD(amqp_exchange_class, declareExchange)

amqp_rpc_reply_t res = amqp_get_rpc_reply(connection->connection_resource->connection_state);

AMQP_EFREE_ARGUMENTS(arguments);
php_amqp_free_amqp_table(arguments);

/* handle any errors that occured outside of signals */
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
Expand Down Expand Up @@ -636,6 +612,8 @@ PHP_METHOD(amqp_exchange_class, publish)
props.content_type = amqp_cstring_bytes("text/plain");
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;

props.headers.entries = 0;

ALLOC_ZVAL(ztmp);

{
Expand Down Expand Up @@ -755,92 +733,16 @@ PHP_METHOD(amqp_exchange_class, publish)

}

if (ini_arr && SUCCESS == zend_hash_find(HASH_OF(ini_arr), "headers", sizeof("headers"), (void*)&zdata)) {
HashTable *headers;
HashPosition pos;
zval **z;
amqp_table_t *headers = NULL;

if (ini_arr && SUCCESS == zend_hash_find(HASH_OF(ini_arr), "headers", sizeof("headers"), (void*)&zdata)) {
MAKE_COPY_ZVAL(zdata, ztmp);

convert_to_array(ztmp);

headers = HASH_OF(ztmp);
zend_hash_internal_pointer_reset_ex(headers, &pos);
headers = convert_zval_to_amqp_table(ztmp TSRMLS_CC);

props._flags |= AMQP_BASIC_HEADERS_FLAG;
props.headers.entries = emalloc(sizeof(struct amqp_table_entry_t_) * zend_hash_num_elements(headers));
props.headers.num_entries = 0;

/* TODO: merge codebase with convert_zval_to_arguments function (then also merge AMQP_EFREE_ARGUMENTS macro and free_field_value function) */
while (zend_hash_get_current_data_ex(headers, (void **)&z, &pos) == SUCCESS) {
char *string_key;
uint string_key_len;

int type;
ulong num_key;

type = zend_hash_get_current_key_ex(headers, &string_key, &string_key_len, &num_key, 0, &pos);

zend_hash_move_forward_ex(headers, &pos);

if (HASH_KEY_IS_STRING == type) {
props.headers.entries[props.headers.num_entries].key.bytes = string_key;
props.headers.entries[props.headers.num_entries].key.len = string_key_len - 1;
} else {
/* previous version ignored non-string keys, so we just make notice about it */
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Ignoring non-string header field '%lu'", num_key);

continue;
}

if (Z_TYPE_PP(z) == IS_STRING) {
props.headers.entries[props.headers.num_entries].value.kind = AMQP_FIELD_KIND_UTF8;
props.headers.entries[props.headers.num_entries].value.value.bytes.bytes = Z_STRVAL_PP(z);
props.headers.entries[props.headers.num_entries].value.value.bytes.len = Z_STRLEN_PP(z);
props.headers.num_entries++;
} else if (Z_TYPE_PP(z) == IS_LONG) {
props.headers.entries[props.headers.num_entries].value.kind = AMQP_FIELD_KIND_I64;
props.headers.entries[props.headers.num_entries].value.value.i64 = (int64_t) Z_LVAL_PP(z);
props.headers.num_entries++;
} else if (Z_TYPE_PP(z) == IS_DOUBLE) {
props.headers.entries[props.headers.num_entries].value.kind = AMQP_FIELD_KIND_F64;
props.headers.entries[props.headers.num_entries].value.value.f64 = (double)Z_DVAL_PP(z);
props.headers.num_entries++;
} else if (Z_TYPE_PP(z) == IS_BOOL) {
props.headers.entries[props.headers.num_entries].value.kind = AMQP_FIELD_KIND_BOOLEAN;
props.headers.entries[props.headers.num_entries].value.value.boolean = (amqp_boolean_t)Z_BVAL_PP(z);
props.headers.num_entries++;
} else if (Z_TYPE_PP(z) == IS_ARRAY) {
zval **arr_data;
amqp_array_t array;
HashPosition arr_pos;

array.entries = emalloc(sizeof(struct amqp_field_value_t_) * zend_hash_num_elements(Z_ARRVAL_PP(z)));
array.num_entries = 0;
for(
zend_hash_internal_pointer_reset_ex(Z_ARRVAL_PP(z), &arr_pos);
zend_hash_get_current_data_ex(Z_ARRVAL_PP(z), (void**) &arr_data, &arr_pos) == SUCCESS;
zend_hash_move_forward_ex(Z_ARRVAL_PP(z), &arr_pos)
) {
if (Z_TYPE_PP(arr_data) == IS_STRING) {
array.entries[array.num_entries].kind = AMQP_FIELD_KIND_UTF8;
array.entries[array.num_entries].value.bytes.bytes = Z_STRVAL_PP(arr_data);
array.entries[array.num_entries].value.bytes.len = Z_STRLEN_PP(arr_data);
array.num_entries ++;
} else {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Ignoring non-string header nested array member type %d for field '%s'", Z_TYPE_PP(arr_data), string_key);
}
}

props.headers.entries[props.headers.num_entries].value.kind = AMQP_FIELD_KIND_ARRAY;
props.headers.entries[props.headers.num_entries].value.value.array = array;
props.headers.num_entries++;
} else {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Ignoring header field '%s' due to unsupported value type (NULL, array or resource)", string_key);
}
}
} else {
props.headers.entries = 0;
props.headers = *headers;
}

channel = AMQP_GET_CHANNEL(exchange);
Expand All @@ -866,12 +768,8 @@ PHP_METHOD(amqp_exchange_class, publish)
(msg_len > 0 ? amqp_cstring_bytes(msg) : amqp_empty_bytes) /* message body */
);

if (props.headers.entries) {
int i;
for (i=0; i < props.headers.num_entries; ++i) {
free_field_value(props.headers.entries[i].value);
}
efree(props.headers.entries);
if (headers) {
php_amqp_free_amqp_table(headers);
}

FREE_ZVAL(ztmp);
Expand Down Expand Up @@ -935,7 +833,7 @@ PHP_METHOD(amqp_exchange_class, bind)
AMQP_VERIFY_CONNECTION(connection, "Could not bind to exchanges.");

if (zvalArguments) {
arguments = convert_zval_to_arguments(zvalArguments);
arguments = convert_zval_to_amqp_table(zvalArguments TSRMLS_CC);
}

amqp_exchange_bind(
Expand All @@ -948,7 +846,7 @@ PHP_METHOD(amqp_exchange_class, bind)
);

if (zvalArguments) {
AMQP_EFREE_ARGUMENTS(arguments);
php_amqp_free_amqp_table(arguments);
}

amqp_rpc_reply_t res = amqp_get_rpc_reply(connection->connection_resource->connection_state);
Expand Down Expand Up @@ -1003,7 +901,7 @@ PHP_METHOD(amqp_exchange_class, unbind)
AMQP_VERIFY_CONNECTION(connection, "Could not unbind from exchanges.");

if (zvalArguments) {
arguments = convert_zval_to_arguments(zvalArguments);
arguments = convert_zval_to_amqp_table(zvalArguments TSRMLS_CC);
}

amqp_exchange_unbind(
Expand All @@ -1016,7 +914,7 @@ PHP_METHOD(amqp_exchange_class, unbind)
);

if (zvalArguments) {
AMQP_EFREE_ARGUMENTS(arguments);
php_amqp_free_amqp_table(arguments);
}

amqp_rpc_reply_t res = amqp_get_rpc_reply(connection->connection_resource->connection_state);
Expand Down
Loading

0 comments on commit 030ebbc

Please sign in to comment.