Open
Description
Description
The following code:
$settings = [
'socket.keepalive.enable' => true,
'log_level' => LOG_WARNING,
'enable.auto.offset.store' => 'true',
'auto.offset.reset' => 'earliest',
'enable.partition.eof' => 'false',
'enable.auto.commit' => 'false',
'max.poll.interval.ms' => 300000,
'session.timeout.ms' => 45000,
'group.id' => 'test-group',
'group.instance.id' => uniqid('', true),
'metadata.broker.list' => 'kafka1:9092,kafka2:9092,kafka3:9092',
];
foreach ($settings as $key => $value) {
$conf->set($key, $value);
}
$consumer = new KafkaConsumer($conf);
$consumer->subscribe(['dave-test1']);
while (!$quit) {
$message = $consumer->consume(60 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "consume: {$message->payload}\n";
$consumer->commitAsync($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "consume: no more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "consume: timed out\n";
break;
default:
throw new \RuntimeException($message->errstr(), $message->err);
}
}
try {
// Is this the best practice to ensure the success of commit offset, such as commitAsync failure ?
$consumer->commit(null);
} catch (Exception $e) {
if ($e->getCode() !== RD_KAFKA_RESP_ERR__NO_OFFSET) {
echo 'commit fail: ' . $e->getMessage();
}
}
echo "consumer process end PID {$process->pid}\n";
Resulted in this output:
Not sure commit all offsets
But I expected this output instead:
Commit all offsets even if commitAsync fails
php-rdkafka Version
6.0.1
librdkafka Version
1.8.2
PHP Version
7.4.33
Operating System
Aws linux 2
Kafka Version
2.1.1