Skip to content

Support producer transactions #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ can help out to understand the internals of this library.
- ext-rdkafka: >=4.0.0
- librdkafka: >=0.11.6 (if you use `<librdkafka:1.x` please define your own error callback)

:warning: To use the transactional producer you'll need:
- ext-rdkafka: >=4.1.0
- librdkafka: >=1.4

## Installation
```
composer require jobcloud/php-kafka-lib "~1.0"
Expand Down Expand Up @@ -57,6 +61,41 @@ $producer->produce($message);
// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
```

##### Transactional producer (needs >=php-rdkafka:4.1 and >=librdkafka:1.4)
```php
<?php

use Jobcloud\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;

$producer = KafkaProducerBuilder::create()
->withAdditionalBroker('localhost:9092')
->build();

$message = KafkaProducerMessage::create('test-topic', 0)
->withKey('asdf-asdf-asfd-asdf')
->withBody('some test message payload')
->withHeaders([ 'key' => 'value' ]);
try {
$producer->beginTransaction(10000);
$producer->produce($message);
$producer->commitTransaction(10000);
} catch (KafkaProducerTransactionRetryException $e) {
// something went wrong but you can retry the failed call (either beginTransaction or commitTransaction)
} catch (KafkaProducerTransactionAbortException $e) {
// you need to call $producer->abortTransaction(10000); and try again
} catch (KafkaProducerTransactionFatalException $e) {
// something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees
}

// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
```

##### Avro Producer
To create an avro prodcuer add the avro encoder.

Expand Down
11 changes: 11 additions & 0 deletions src/Exception/KafkaProducerTransactionAbortException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class KafkaProducerTransactionAbortException extends \Exception
{
public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE =
'Produce failed. You need to abort your current transaction and start a new one';
}
11 changes: 11 additions & 0 deletions src/Exception/KafkaProducerTransactionFatalException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class KafkaProducerTransactionFatalException extends \Exception
{
public const FATAL_TRANSACTION_EXCEPTION_MESSAGE =
'Produce failed with a fatal error. This producer instance cannot be used anymore.';
}
10 changes: 10 additions & 0 deletions src/Exception/KafkaProducerTransactionRetryException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class KafkaProducerTransactionRetryException extends \Exception
{
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried';
}
98 changes: 98 additions & 0 deletions src/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

namespace Jobcloud\Kafka\Producer;

use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use Jobcloud\Kafka\Message\Encoder\EncoderInterface;
use Jobcloud\Kafka\Conf\KafkaConfiguration;
use RdKafka\Producer as RdKafkaProducer;
use RdKafka\ProducerTopic as RdKafkaProducerTopic;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;
use RdKafka\Exception as RdKafkaException;
use RdKafka\KafkaErrorException as RdKafkaErrorException;

final class KafkaProducer implements KafkaProducerInterface
{
Expand All @@ -35,6 +39,11 @@ final class KafkaProducer implements KafkaProducerInterface
*/
protected $encoder;

/**
* @var bool
*/
private $transactionInitialized = false;

/**
* KafkaProducer constructor.
* @param RdKafkaProducer $producer
Expand Down Expand Up @@ -160,6 +169,68 @@ public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000):
->current();
}

/**
* Start a producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function beginTransaction(int $timeoutMs): void
{
try {
if (false === $this->transactionInitialized) {
$this->producer->initTransactions($timeoutMs);
$this->transactionInitialized = true;
}

$this->producer->beginTransaction();
} catch (RdKafkaErrorException $e) {
$this->handleTransactionError($e);
}
}

/**
* Commit the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function commitTransaction(int $timeoutMs): void
{
try {
$this->producer->commitTransaction($timeoutMs);
} catch (RdKafkaErrorException $e) {
$this->handleTransactionError($e);
}
}

/**
* Abort the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function abortTransaction(int $timeoutMs): void
{
try {
$this->producer->abortTransaction($timeoutMs);
} catch (RdKafkaErrorException $e) {
$this->handleTransactionError($e);
}
}

/**
* @param string $topic
* @return RdKafkaProducerTopic
Expand All @@ -172,4 +243,31 @@ private function getProducerTopicForTopic(string $topic): RdKafkaProducerTopic

return $this->producerTopics[$topic];
}

/**
* @param RdKafkaErrorException $e
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
private function handleTransactionError(RdKafkaErrorException $e): void
{
if (true === $e->isRetriable()) {
throw new KafkaProducerTransactionRetryException(
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE
);
} elseif (true === $e->transactionRequiresAbort()) {
throw new KafkaProducerTransactionAbortException(
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE
);
} else {
$this->transactionInitialized = false;
// according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal
// fatal errors (so stated), need the producer to be destroyed
throw new KafkaProducerTransactionFatalException(
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE
);
}
}
}
39 changes: 39 additions & 0 deletions src/Producer/KafkaProducerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

namespace Jobcloud\Kafka\Producer;

use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;

Expand Down Expand Up @@ -71,4 +74,40 @@ public function flush(int $timeoutMs): int;
* @return RdKafkaMetadataTopic
*/
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic;

/**
* Start a producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function beginTransaction(int $timeoutMs): void;

/**
* Commit the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function commitTransaction(int $timeoutMs): void;

/**
* Abort the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function abortTransaction(int $timeoutMs): void;
}
Loading