Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
response->ErrorCode = EKafkaErrors::NONE_ERROR;

AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS);
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA);
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID);
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE);
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2});
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9});
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4});
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1});
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2});
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1});
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9});
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3});
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5});
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4});
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0});
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8});

return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Coordinators.push_back(coordinator);
}

response->ErrorCode = NONE_ERROR;
response->Host = host;
response->Port = port;
response->NodeId = nodeId;

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
partition.CommittedOffset = sourcePartition.CommittedOffset;
partition.PartitionIndex = sourcePartition.PartitionIndex;
partition.ErrorCode = sourcePartition.ErrorCode;
topic.Partitions.push_back(partition);
}
response->Topics.push_back(topic);
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr

switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
return;
}

if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
for (auto part: finalPartitionsToRead) {
KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
topicPartition.Partitions.push_back(part);
assignment.AssignedPartitions.push_back(topicPartition);
partitions.ReadingNow.emplace(part);
}
assignment.AssignedPartitions.push_back(topicPartition);
}

return assignment;
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion
}

void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
auto useVarintSize = _version > 3;
_version = ASSIGNMENT_VERSION;

if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
}

_writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
if (useVarintSize) {
_writable.writeUnsignedVarint(Size(_version) + 1);
} else {
TKafkaInt32 size = Size(_version);
_writable << size;
}

_writable << _version;
NPrivate::TWriteCollector _collector;
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kafka_proxy/kafka_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
}
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
auto useVarintSize = _version > 3;
if (useVarintSize) {
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
} else {
return _collector.Size + sizeof(TKafkaInt32);
}
}


Expand Down