@@ -340,6 +340,42 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
340340 Register (CreateKafkaAlterConfigsActor (Context, header->CorrelationId , message));
341341 }
342342
343+ void HandleMessage (const TRequestHeaderData* header, const TMessagePtr<TAddPartitionsToTxnRequestData>& message) {
344+ Send (MakeTransactionsServiceID (), new TEvKafka::TEvAddPartitionsToTxnRequest (
345+ header->CorrelationId ,
346+ message,
347+ Context->ConnectionId ,
348+ Context->DatabasePath
349+ ));
350+ }
351+
352+ void HandleMessage (const TRequestHeaderData* header, const TMessagePtr<TAddOffsetsToTxnRequestData>& message) {
353+ Send (MakeTransactionsServiceID (), new TEvKafka::TEvAddOffsetsToTxnRequest (
354+ header->CorrelationId ,
355+ message,
356+ Context->ConnectionId ,
357+ Context->DatabasePath
358+ ));
359+ }
360+
361+ void HandleMessage (const TRequestHeaderData* header, const TMessagePtr<TTxnOffsetCommitRequestData>& message) {
362+ Send (MakeTransactionsServiceID (), new TEvKafka::TEvTxnOffsetCommitRequest (
363+ header->CorrelationId ,
364+ message,
365+ Context->ConnectionId ,
366+ Context->DatabasePath
367+ ));
368+ }
369+
370+ void HandleMessage (const TRequestHeaderData* header, const TMessagePtr<TEndTxnRequestData>& message) {
371+ Send (MakeTransactionsServiceID (), new TEvKafka::TEvEndTxnRequest (
372+ header->CorrelationId ,
373+ message,
374+ Context->ConnectionId ,
375+ Context->DatabasePath
376+ ));
377+ }
378+
343379 template <class T >
344380 TMessagePtr<T> Cast (std::shared_ptr<Msg>& request) {
345381 return TMessagePtr<T>(request->Buffer , request->Message );
@@ -443,6 +479,22 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
443479 HandleMessage (&Request->Header , Cast<TAlterConfigsRequestData>(Request));
444480 break ;
445481
482+ case ADD_PARTITIONS_TO_TXN:
483+ HandleMessage (&Request->Header , Cast<TAddPartitionsToTxnRequestData>(Request));
484+ break ;
485+
486+ case ADD_OFFSETS_TO_TXN:
487+ HandleMessage (&Request->Header , Cast<TAddOffsetsToTxnRequestData>(Request));
488+ break ;
489+
490+ case TXN_OFFSET_COMMIT:
491+ HandleMessage (&Request->Header , Cast<TTxnOffsetCommitRequestData>(Request));
492+ break ;
493+
494+ case END_TXN:
495+ HandleMessage (&Request->Header , Cast<TEndTxnRequestData>(Request));
496+ break ;
497+
446498 default :
447499 KAFKA_LOG_ERROR (" Unsupported message: ApiKey=" << Request->Header .RequestApiKey );
448500 PassAway ();
0 commit comments