4
4
5
5
namespace Jobcloud \Kafka \Producer ;
6
6
7
+ use Jobcloud \Kafka \Exception \KafkaProducerTransactionAbortException ;
8
+ use Jobcloud \Kafka \Exception \KafkaProducerTransactionFatalException ;
9
+ use Jobcloud \Kafka \Exception \KafkaProducerTransactionRetryException ;
7
10
use Jobcloud \Kafka \Message \KafkaProducerMessageInterface ;
8
11
use Jobcloud \Kafka \Message \Encoder \EncoderInterface ;
9
12
use Jobcloud \Kafka \Conf \KafkaConfiguration ;
10
13
use RdKafka \Producer as RdKafkaProducer ;
11
14
use RdKafka \ProducerTopic as RdKafkaProducerTopic ;
12
15
use RdKafka \Metadata \Topic as RdKafkaMetadataTopic ;
13
16
use RdKafka \Exception as RdKafkaException ;
17
+ use RdKafka \KafkaErrorException as RdKafkaErrorException ;
14
18
15
19
final class KafkaProducer implements KafkaProducerInterface
16
20
{
@@ -35,6 +39,11 @@ final class KafkaProducer implements KafkaProducerInterface
35
39
*/
36
40
protected $ encoder ;
37
41
42
+ /**
43
+ * @var bool
44
+ */
45
+ private $ transactionInitialized = false ;
46
+
38
47
/**
39
48
* KafkaProducer constructor.
40
49
* @param RdKafkaProducer $producer
@@ -160,6 +169,68 @@ public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000):
160
169
->current ();
161
170
}
162
171
172
+ /**
173
+ * Start a producer transaction
174
+ *
175
+ * @param int $timeoutMs
176
+ * @return void
177
+ *
178
+ * @throws KafkaProducerTransactionAbortException
179
+ * @throws KafkaProducerTransactionFatalException
180
+ * @throws KafkaProducerTransactionRetryException
181
+ */
182
+ public function beginTransaction (int $ timeoutMs ): void
183
+ {
184
+ try {
185
+ if (false === $ this ->transactionInitialized ) {
186
+ $ this ->producer ->initTransactions ($ timeoutMs );
187
+ $ this ->transactionInitialized = true ;
188
+ }
189
+
190
+ $ this ->producer ->beginTransaction ();
191
+ } catch (RdKafkaErrorException $ e ) {
192
+ $ this ->handleTransactionError ($ e );
193
+ }
194
+ }
195
+
196
+ /**
197
+ * Commit the current producer transaction
198
+ *
199
+ * @param int $timeoutMs
200
+ * @return void
201
+ *
202
+ * @throws KafkaProducerTransactionAbortException
203
+ * @throws KafkaProducerTransactionFatalException
204
+ * @throws KafkaProducerTransactionRetryException
205
+ */
206
+ public function commitTransaction (int $ timeoutMs ): void
207
+ {
208
+ try {
209
+ $ this ->producer ->commitTransaction ($ timeoutMs );
210
+ } catch (RdKafkaErrorException $ e ) {
211
+ $ this ->handleTransactionError ($ e );
212
+ }
213
+ }
214
+
215
+ /**
216
+ * Abort the current producer transaction
217
+ *
218
+ * @param int $timeoutMs
219
+ * @return void
220
+ *
221
+ * @throws KafkaProducerTransactionAbortException
222
+ * @throws KafkaProducerTransactionFatalException
223
+ * @throws KafkaProducerTransactionRetryException
224
+ */
225
+ public function abortTransaction (int $ timeoutMs ): void
226
+ {
227
+ try {
228
+ $ this ->producer ->abortTransaction ($ timeoutMs );
229
+ } catch (RdKafkaErrorException $ e ) {
230
+ $ this ->handleTransactionError ($ e );
231
+ }
232
+ }
233
+
163
234
/**
164
235
* @param string $topic
165
236
* @return RdKafkaProducerTopic
@@ -172,4 +243,31 @@ private function getProducerTopicForTopic(string $topic): RdKafkaProducerTopic
172
243
173
244
return $ this ->producerTopics [$ topic ];
174
245
}
246
+
247
+ /**
248
+ * @param RdKafkaErrorException $e
249
+ *
250
+ * @throws KafkaProducerTransactionAbortException
251
+ * @throws KafkaProducerTransactionFatalException
252
+ * @throws KafkaProducerTransactionRetryException
253
+ */
254
+ private function handleTransactionError (RdKafkaErrorException $ e ): void
255
+ {
256
+ if (true === $ e ->isRetriable ()) {
257
+ throw new KafkaProducerTransactionRetryException (
258
+ KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE
259
+ );
260
+ } elseif (true === $ e ->transactionRequiresAbort ()) {
261
+ throw new KafkaProducerTransactionAbortException (
262
+ KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE
263
+ );
264
+ } else {
265
+ $ this ->transactionInitialized = false ;
266
+ // according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal
267
+ // fatal errors (so stated), need the producer to be destroyed
268
+ throw new KafkaProducerTransactionFatalException (
269
+ KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE
270
+ );
271
+ }
272
+ }
175
273
}
0 commit comments