Skip to content

LOGBROKER-8859 Improvements in OffsetFetch endpoint in Kafka API #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 56 additions & 50 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,32 @@ struct PartitionOffsets {
};

class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetActor,
//Заменить на свои Request и Response
NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest,
NKikimr::NGRpcProxy::V1::TLocalRequestBase,
NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse>,
public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl {
using TBase = NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetActor,
NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest,
NKikimr::NGRpcProxy::V1::TLocalRequestBase,
NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse>;

public:
TTopicOffsetActor(std::shared_ptr<TSet<TString>> consumers,
const NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest& request,
const NKikimr::NGRpcProxy::V1::TLocalRequestBase& request,
const TActorId& requester,
std::shared_ptr<TSet<ui32>> partitions)
std::shared_ptr<TSet<ui32>> partitions,
const TString& originalTopicName,
const TString& userSID)
: TBase(request, requester)
, TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions))
, Requester(requester)
, TopicName(request.Topic)
, OriginalTopicName(originalTopicName)
, UserSID(userSID)
{
Y_UNUSED(requester);
};

void Bootstrap(const NActors::TActorContext& ctx) override {
Y_UNUSED(ctx);
KAFKA_LOG_D("TopicOffsetActor: Get commited offsets for topic '" << OriginalTopicName
<< "' for user '" << UserSID << "'");
SendDescribeProposeRequest();
Become(&TTopicOffsetActor::StateWork);
};
Expand All @@ -67,14 +70,18 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
const Ydb::PersQueue::ErrorCode::ErrorCode errorCode,
const Ydb::StatusIds::StatusCode status,
const TActorContext& ctx) override {
Y_UNUSED(error);
Y_UNUSED(errorCode);
Y_UNUSED(status);
Y_UNUSED(ctx);

KAFKA_LOG_D("TopicOffsetActor: error raised for '" << OriginalTopicName << "'"
<< " for user '" << UserSID << "'."
<< " Error: '" << error << "',"
<< " ErrorCode: '" << static_cast<int>(errorCode) << "',"
<< " StatusCode: '" << status<< "'.");

THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
response->TopicName = TopicName;
response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::ERROR;
response->TopicName = OriginalTopicName;
response->Status = ConvertErrorCode(status);

Send(Requester, response.Release());
Die(ctx);
}
Expand Down Expand Up @@ -114,10 +121,12 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<

void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override {
const auto& response = ev->Get()->Request.Get()->ResultSet.front();
KAFKA_LOG_D("TopicOffsetActor: TEvNavigateKeySetResult recieved for topic '" << OriginalTopicName
<< "' for user '" << UserSID << "'. PQGroupInfo is present: " << (response.PQGroupInfo.Get() != nullptr));
if (!response.PQGroupInfo) {
THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
response->TopicName = TopicName;
response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::UNKNOWN_TOPIC;
response->TopicName = OriginalTopicName;
response->Status = UNKNOWN_TOPIC_OR_PARTITION;
Send(Requester, response.Release());
TActorBootstrapped::PassAway();
return;
Expand All @@ -128,17 +137,20 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
}

void Reply(const TActorContext& ctx) override {
KAFKA_LOG_D("TopicOffsetActor: replying for topic '" << OriginalTopicName
<< "' for user '" << UserSID << "'" << " with status NONE_ERROR");
THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
response->TopicName = TopicName;
response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::OK;
response->TopicName = OriginalTopicName;
response->Status = NONE_ERROR;
response->PartitionIdToOffsets = PartitionIdToOffsets;
Send(Requester, response.Release());
Die(ctx);
};

private:
TActorId Requester;
TString TopicName;
const TActorId Requester;
const TString OriginalTopicName;
const TString UserSID;
std::unordered_map<ui32, ui32> PartitionIdToOffset {};
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>();
};
Expand All @@ -156,37 +168,28 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics topic;
TString topicName = requestTopic.Name.value();
topic.Name = topicName;
if (UnknownTopics.contains(topicName)) {
if (TopicsToResponses[topicName]->Status == NONE_ERROR) {
auto partitionsToOffsets = TopicsToResponses[topicName]->PartitionIdToOffsets;
for (auto requestPartition: requestTopic.PartitionIndexes) {
TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
partition.PartitionIndex = requestPartition;
partition.ErrorCode = UNKNOWN_TOPIC_OR_PARTITION;
if (partitionsToOffsets.get() != nullptr
&& partitionsToOffsets->contains(requestPartition)
&& (*partitionsToOffsets)[requestPartition].contains(requestGroup.GroupId.value())) {
partition.CommittedOffset = (*partitionsToOffsets)[requestPartition][requestGroup.GroupId.value()];
partition.ErrorCode = NONE_ERROR;
} else {
partition.ErrorCode = RESOURCE_NOT_FOUND;
}
topic.Partitions.push_back(partition);
}
group.Topics.push_back(topic);
continue;
}
if (ErroredTopics.contains(topicName)) {
} else {
for (auto requestPartition: requestTopic.PartitionIndexes) {
TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
partition.PartitionIndex = requestPartition;
partition.ErrorCode = UNKNOWN_SERVER_ERROR;
partition.ErrorCode = TopicsToResponses[topicName]->Status;
topic.Partitions.push_back(partition);
}
group.Topics.push_back(topic);
continue;
}
auto partitionsToOffsets = TopicToOffsets[topicName];
for (auto requestPartition: requestTopic.PartitionIndexes) {
TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
partition.PartitionIndex = requestPartition;
if (partitionsToOffsets->contains(requestPartition) && (*partitionsToOffsets)[requestPartition].contains(requestGroup.GroupId.value())) {
partition.CommittedOffset = (*partitionsToOffsets)[requestPartition][requestGroup.GroupId.value()];
partition.ErrorCode = NONE_ERROR;
} else {
partition.ErrorCode = RESOURCE_NOT_FOUND;
}
topic.Partitions.push_back(partition);
}
group.Topics.push_back(topic);
}
Expand All @@ -210,6 +213,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()

void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
// If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later
KAFKA_LOG_D("TopicOffsetActor: new request for user '" << Context->UserToken->GetUserSID()<< "'");
if (Message->Groups.empty()) {
TOffsetFetchRequestData::TOffsetFetchRequestGroup group;
group.GroupId = Message->GroupId.value();
Expand All @@ -230,15 +234,18 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
}

for (const auto& topicToEntities : TopicToEntities) {
NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = topicToEntities.first;
locationRequest.Token = Context->UserToken->GetSerializedToken();
locationRequest.Database = Context->DatabasePath;
NKikimr::NGRpcProxy::V1::TLocalRequestBase locationRequest{
NormalizePath(Context->DatabasePath, topicToEntities.first),
Context->DatabasePath,
Context->UserToken->GetSerializedToken(),
};
ctx.Register(new TTopicOffsetActor(
topicToEntities.second.Consumers,
locationRequest,
SelfId(),
topicToEntities.second.Partitions
topicToEntities.second.Partitions,
topicToEntities.first,
Context->UserToken->GetUserSID()
));
InflyTopics++;
}
Expand All @@ -261,14 +268,13 @@ void TKafkaOffsetFetchActor::ExtractPartitions(const TString& group, const NKafk

void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx) {
InflyTopics--;
TopicToOffsets[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets;
if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::ERROR) {
ErroredTopics.insert(ev->Get()->TopicName);
} else if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::UNKNOWN_TOPIC) {
UnknownTopics.insert(ev->Get()->TopicName);
}

auto eventPtr = ev->Release();
TopicsToResponses[eventPtr->TopicName] = eventPtr;

if (InflyTopics == 0) {
auto response = GetOffsetFetchResponse();
KAFKA_LOG_D("TopicOffsetActor: sending response to user '" << Context->UserToken->GetUserSID()<< "'");
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ class TKafkaOffsetFetchActor: public NActors::TActorBootstrapped<TKafkaOffsetFet
const ui64 CorrelationId;
const TMessagePtr<TOffsetFetchRequestData> Message;
std::unordered_map<TString, TopicEntities> TopicToEntities;
std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets;
std::set<TString> UnknownTopics;
std::set<TString> ErroredTopics;
std::unordered_map<TString, TAutoPtr<TEvKafka::TEvCommitedOffsetsResponse>> TopicsToResponses;
ui32 InflyTopics = 0;

};
Expand Down
8 changes: 1 addition & 7 deletions ydb/core/kafka_proxy/kafka_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,11 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
{
enum EStatus {
OK,
ERROR,
UNKNOWN_TOPIC,
};

TEvCommitedOffsetsResponse()
{}

TString TopicName;
EStatus Status;
EKafkaErrors Status;
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
};

Expand Down
14 changes: 14 additions & 0 deletions ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,20 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1);
}

{
// Check with short topic name
std::map<TString, std::vector<i32>> topicsToPartions;
topicsToPartions[shortTopicName] = std::vector<i32>{0, 1, 2, 3};
auto msg = client.OffsetFetch(consumerName, topicsToPartions);
UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1);
const auto& partitions = msg->Groups[0].Topics[0].Partitions;
UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4);
auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; });
UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end());
UNIT_ASSERT_VALUES_EQUAL(partition0->ErrorCode, NONE_ERROR);
UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1);
}
{
// Check with nonexistent topic
std::map<TString, std::vector<i32>> topicsToPartions;
Expand Down