Skip to content

Commit c5fc3a3

Browse files
[Kafka API] Add forwarding of txn requests to coordinator (#17765)
1 parent d496fe5 commit c5fc3a3

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,42 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
340340
Register(CreateKafkaAlterConfigsActor(Context, header->CorrelationId, message));
341341
}
342342

343+
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddPartitionsToTxnRequestData>& message) {
344+
Send(MakeTransactionsServiceID(), new TEvKafka::TEvAddPartitionsToTxnRequest(
345+
header->CorrelationId,
346+
message,
347+
Context->ConnectionId,
348+
Context->DatabasePath
349+
));
350+
}
351+
352+
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddOffsetsToTxnRequestData>& message) {
353+
Send(MakeTransactionsServiceID(), new TEvKafka::TEvAddOffsetsToTxnRequest(
354+
header->CorrelationId,
355+
message,
356+
Context->ConnectionId,
357+
Context->DatabasePath
358+
));
359+
}
360+
361+
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TTxnOffsetCommitRequestData>& message) {
362+
Send(MakeTransactionsServiceID(), new TEvKafka::TEvTxnOffsetCommitRequest(
363+
header->CorrelationId,
364+
message,
365+
Context->ConnectionId,
366+
Context->DatabasePath
367+
));
368+
}
369+
370+
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TEndTxnRequestData>& message) {
371+
Send(MakeTransactionsServiceID(), new TEvKafka::TEvEndTxnRequest(
372+
header->CorrelationId,
373+
message,
374+
Context->ConnectionId,
375+
Context->DatabasePath
376+
));
377+
}
378+
343379
template<class T>
344380
TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) {
345381
return TMessagePtr<T>(request->Buffer, request->Message);
@@ -443,6 +479,22 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
443479
HandleMessage(&Request->Header, Cast<TAlterConfigsRequestData>(Request));
444480
break;
445481

482+
case ADD_PARTITIONS_TO_TXN:
483+
HandleMessage(&Request->Header, Cast<TAddPartitionsToTxnRequestData>(Request));
484+
break;
485+
486+
case ADD_OFFSETS_TO_TXN:
487+
HandleMessage(&Request->Header, Cast<TAddOffsetsToTxnRequestData>(Request));
488+
break;
489+
490+
case TXN_OFFSET_COMMIT:
491+
HandleMessage(&Request->Header, Cast<TTxnOffsetCommitRequestData>(Request));
492+
break;
493+
494+
case END_TXN:
495+
HandleMessage(&Request->Header, Cast<TEndTxnRequestData>(Request));
496+
break;
497+
446498
default:
447499
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
448500
PassAway();

0 commit comments

Comments
 (0)