Skip to content

Producer timeouts #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ static void kafka_init( rd_kafka_type_t type )
}
}

int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len)
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout)
{
char errstr[512];
rd_kafka_topic_t *rkt = NULL;
Expand All @@ -411,7 +411,30 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le
}
return -2;
}

/* Topic configuration */
conf = rd_kafka_topic_conf_new();

rd_kafka_topic_conf_set(conf,"produce.offset.report", "true", errstr, sizeof errstr );

char timeoutStr[64];
snprintf(timeoutStr, 64, "%lu", timeout);
if (rd_kafka_topic_conf_set(conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
if (log_level)
{
openlog("phpkafka", 0, LOG_USER);
syslog(
LOG_ERR,
"Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s",
timeout,
errstr
);
}
rd_kafka_topic_conf_destroy(conf);
return -3;
}

//callback already set in kafka_set_connection
rkt = rd_kafka_topic_new(r, topic, conf);
if (!rkt)
Expand All @@ -424,6 +447,7 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le
rd_kafka_topic_conf_destroy(conf);
return -1;
}

//begin producing:
if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, msg_len,NULL, 0,&pcb) == -1)
{
Expand All @@ -444,8 +468,9 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le
return 0;
}

int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report)
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout)
{
char errstr[512];
rd_kafka_topic_t *rkt;
struct produce_cb_params pcb = {msg_cnt, 0, 0, 0, 0, NULL};
void *opaque;
Expand All @@ -472,6 +497,24 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();

char timeoutStr[64];
snprintf(timeoutStr, 64, "%lu", timeout);
if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
if (log_level)
{
openlog("phpkafka", 0, LOG_USER);
syslog(
LOG_ERR,
"Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s",
timeout,
errstr
);
}
rd_kafka_topic_conf_destroy(topic_conf);
return -3;
}

/* Create topic */
rkt = rd_kafka_topic_new(r, topic, topic_conf);

Expand Down Expand Up @@ -529,9 +572,10 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in
return err_cnt;
}

int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report)
int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout)
{

char errstr[512];
rd_kafka_topic_t *rkt;
struct produce_cb_params pcb = {1, 0, 0, 0, 0, NULL};
void *opaque;
Expand All @@ -558,6 +602,24 @@ int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();

char timeoutStr[64];
snprintf(timeoutStr, 64, "%lu", timeout);
if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
if (log_level)
{
openlog("phpkafka", 0, LOG_USER);
syslog(
LOG_ERR,
"Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s",
timeout,
errstr
);
}
rd_kafka_topic_conf_destroy(topic_conf);
return -3;
}

/* Create topic */
rkt = rd_kafka_topic_new(r, topic, topic_conf);

Expand Down
6 changes: 3 additions & 3 deletions kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ typedef struct connection_params_s {
void kafka_setup(char *brokers);
void kafka_set_log_level(int ll);
void kafka_set_partition(int partition);
int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report);
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len);
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report);
int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout);
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout);
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout);
rd_kafka_t *kafka_set_connection(rd_kafka_type_t type, const char *b, int report_level, const char *compression);
rd_kafka_t *kafka_get_connection(kafka_connection_params params, const char *brokers);
int kafka_consume(rd_kafka_t *r, zval* return_value, char* topic, char* offset, int item_count, int partition);
Expand Down
33 changes: 21 additions & 12 deletions php_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ ZEND_BEGIN_ARG_INFO(arginf_kafka_set_get_partition, 0)
ZEND_ARG_INFO(0, mode)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO(arginf_kafka_produce, 0)
ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_produce, 0, 0, 2)
ZEND_ARG_INFO(0, topic)
ZEND_ARG_INFO(0, message)
ZEND_ARG_INFO(0, timeout)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO(arginf_kafka_produce_batch, 0)
ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_produce_batch, 0, 0, 2)
ZEND_ARG_INFO(0, topic)
ZEND_ARG_INFO(0, messages)
ZEND_ARG_INFO(0, timeout)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_consume, 0, 0, 2)
Expand Down Expand Up @@ -1012,7 +1014,7 @@ PHP_METHOD(Kafka, disconnect)
}
/* }}} end Kafka::disconnect */

/* {{{ proto Kafka Kafka::produce( string $topic, string $message);
/* {{{ proto Kafka Kafka::produce( string $topic, string $message [, int $timeout = 60000]);
Produce a message, returns instance
or throws KafkaException in case something went wrong
*/
Expand All @@ -1023,14 +1025,16 @@ PHP_METHOD(Kafka, produce)
char *topic;
char *msg;
long reporting = connection->delivery_confirm_mode;
long timeout = 60000;
int topic_len,
msg_len,
status = 0;


if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss",
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|l",
&topic, &topic_len,
&msg, &msg_len) == FAILURE) {
&msg, &msg_len,
&timeout) == FAILURE) {
return;
}
if (!connection->producer)
Expand All @@ -1055,9 +1059,9 @@ PHP_METHOD(Kafka, produce)
(int) connection->producer_partition
);
if (connection->delivery_confirm_mode == PHP_KAFKA_CONFIRM_EXTENDED)
status = kafka_produce_report(connection->producer, topic, msg, msg_len);
status = kafka_produce_report(connection->producer, topic, msg, msg_len, timeout);
else
status = kafka_produce(connection->producer, topic, msg, msg_len, connection->delivery_confirm_mode);
status = kafka_produce(connection->producer, topic, msg, msg_len, connection->delivery_confirm_mode, timeout);
switch (status)
{
case -1:
Expand All @@ -1066,12 +1070,15 @@ PHP_METHOD(Kafka, produce)
case -2:
zend_throw_exception(kafka_exception, "Connection failure, cannot produce message", 0 TSRMLS_CC);
return;
case -3:
zend_throw_exception(kafka_exception, "Topic configuration error", 0 TSRMLS_CC);
return;
}
RETURN_ZVAL(object, 1, 0);
}
/* }}} end Kafka::produce */

/* {{{ proto Kafka Kafka::produceBatch( string $topic, array $messages);
/* {{{ proto Kafka Kafka::produceBatch( string $topic, array $messages [, int $timeout = 60000]);
Produce a batch of messages, returns instance
or throws exceptions in case of error
*/
Expand All @@ -1086,14 +1093,16 @@ PHP_METHOD(Kafka, produceBatch)
char *msg_batch[50];
int msg_batch_len[50] = {0};
long reporting = connection->delivery_confirm_mode;
long timeout = 60000;
int topic_len,
msg_len,
current_idx = 0,
status = 0;
HashPosition pos;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa",
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa|l",
&topic, &topic_len,
&arr) == FAILURE) {
&arr,
&timeout) == FAILURE) {
return;
}
//get producer up and running
Expand Down Expand Up @@ -1131,7 +1140,7 @@ PHP_METHOD(Kafka, produceBatch)
++current_idx;
if (current_idx == 50)
{
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode);
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout);
if (status)
{
if (status < 0)
Expand All @@ -1151,7 +1160,7 @@ PHP_METHOD(Kafka, produceBatch)
}
if (current_idx)
{//we still have some messages to produce...
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode);
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout);
if (status)
{
if (status < 0)
Expand Down
6 changes: 4 additions & 2 deletions stub/Kafka.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,11 @@ public function isConnected($mode = null)
* produce message on topic
* @param string $topic
* @param string $message
* @param int $timeout
* @return $this
* @throws \KafkaException
*/
public function produce($topic, $message)
public function produce($topic, $message, $timeout=5000)
{
$this->connected = true;
//internal call, produce message on topic
Expand All @@ -251,10 +252,11 @@ public function produce($topic, $message)
* Causing any overhead (internally, array is iterated, and produced
* @param string $topic
* @param array $messages
* @param int $timeout
* @return $this
* @throws \KafkaException
*/
public function produceBatch($topic, array $messages)
public function produceBatch($topic, array $messages, $timeout=5000)
{
foreach ($messages as $msg) {
//non-string messages are skipped silently ATM
Expand Down