Skip to content

Commit 4fa5063

Browse files
authored
Always initialize Message::$headers (#509)
Fixes #508
1 parent acec7ea commit 4fa5063

File tree

3 files changed

+123
-4
lines changed

3 files changed

+123
-4
lines changed

message.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze
4646

4747
timestamp = rd_kafka_message_timestamp(message, &tstype);
4848

49+
zval headers_array;
4950
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
5051
rd_kafka_headers_t *message_headers = NULL;
5152
rd_kafka_resp_err_t header_response;
5253
const char *header_name = NULL;
5354
const void *header_value = NULL;
5455
size_t header_size = 0;
55-
zval headers_array;
5656
size_t i;
5757
#endif /* HAVE_RD_KAFKA_MESSAGE_HEADERS */
5858

@@ -72,23 +72,23 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze
7272
}
7373
zend_update_property_long(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("offset"), message->offset);
7474

75+
array_init(&headers_array);
7576
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
7677
if (message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
7778
rd_kafka_message_headers(message, &message_headers);
7879
if (message_headers != NULL) {
79-
array_init(&headers_array);
8080
for (i = 0; i < rd_kafka_header_cnt(message_headers); i++) {
8181
header_response = rd_kafka_header_get_all(message_headers, i, &header_name, &header_value, &header_size);
8282
if (header_response != RD_KAFKA_RESP_ERR_NO_ERROR) {
8383
break;
8484
}
8585
add_assoc_stringl(&headers_array, header_name, (const char*)header_value, header_size);
8686
}
87-
zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array);
88-
zval_ptr_dtor(&headers_array);
8987
}
9088
}
9189
#endif
90+
zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array);
91+
zval_ptr_dtor(&headers_array);
9292

9393
if (msg_opaque != NULL) {
9494
zend_update_property_str(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("opaque"), msg_opaque);

package.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
<file role="test" name="bug115.phpt"/>
123123
<file role="test" name="bug330.phpt"/>
124124
<file role="test" name="bug465.phpt"/>
125+
<file role="test" name="bug508.phpt"/>
125126
<file role="test" name="bug74.phpt"/>
126127
<file role="test" name="bug88.phpt"/>
127128
<file role="test" name="bugConfSetArgument.phpt"/>

tests/bug508.phpt

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
--TEST--
2+
Bug 508
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
--FILE--
7+
<?php
8+
require __DIR__ . '/integration-tests-check.php';
9+
10+
$topicName = sprintf("test_rdkafka_%s", uniqid());
11+
12+
$conf = new RdKafka\Conf();
13+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
14+
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
15+
if ($msg->err) {
16+
throw new Exception("Message delivery failed: " . $msg->errstr());
17+
}
18+
$delivered++;
19+
});
20+
21+
$producer = new RdKafka\Producer($conf);
22+
$topic = $producer->newTopic($topicName);
23+
24+
if (!$producer->getMetadata(false, $topic, 10*1000)) {
25+
echo "Failed to get metadata, is broker down?\n";
26+
}
27+
28+
$topic->produce(0, 0, "message");
29+
30+
while ($producer->getOutQLen()) {
31+
$producer->poll(50);
32+
}
33+
34+
printf("%d messages delivered\n", $delivered);
35+
36+
$conf = new RdKafka\Conf();
37+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
38+
$conf->set('enable.partition.eof', 'true');
39+
40+
$consumer = new RdKafka\Consumer($conf);
41+
$topic = $consumer->newTopic($topicName);
42+
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
43+
44+
while (true) {
45+
$msg = $topic->consume(0, 1000);
46+
if (!$msg) {
47+
continue;
48+
}
49+
// All props are initialized and readable in all cases
50+
var_dump([
51+
'err' => $msg->err,
52+
'topic_name' => $msg->topic_name,
53+
'timestamp' => $msg->timestamp,
54+
'partition' => $msg->partition,
55+
'payload' => $msg->payload,
56+
'len' => $msg->len,
57+
'key' => $msg->key,
58+
'offset' => $msg->offset,
59+
'headers' => $msg->headers,
60+
'opaque' => $msg->opaque,
61+
]);
62+
echo "--------------\n";
63+
if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
64+
echo "EOF\n";
65+
break;
66+
}
67+
}
68+
--EXPECTF--
69+
1 messages delivered
70+
array(10) {
71+
["err"]=>
72+
int(0)
73+
["topic_name"]=>
74+
string(%d) "test_rdkafka_%s"
75+
["timestamp"]=>
76+
int(%d)
77+
["partition"]=>
78+
int(0)
79+
["payload"]=>
80+
string(7) "message"
81+
["len"]=>
82+
int(7)
83+
["key"]=>
84+
NULL
85+
["offset"]=>
86+
int(0)
87+
["headers"]=>
88+
array(0) {
89+
}
90+
["opaque"]=>
91+
NULL
92+
}
93+
--------------
94+
array(10) {
95+
["err"]=>
96+
int(-%d)
97+
["topic_name"]=>
98+
string(%d) "test_rdkafka_%s"
99+
["timestamp"]=>
100+
int(-1)
101+
["partition"]=>
102+
int(0)
103+
["payload"]=>
104+
string(%d) "%s"
105+
["len"]=>
106+
int(%d)
107+
["key"]=>
108+
NULL
109+
["offset"]=>
110+
int(1)
111+
["headers"]=>
112+
array(0) {
113+
}
114+
["opaque"]=>
115+
NULL
116+
}
117+
--------------
118+
EOF

0 commit comments

Comments
 (0)