Skip to content

Kafka fixes for 24 4 #14641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ struct TContext {
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString ClientDC;
bool IsServerless = false;
bool RequireAuthentication = false;

NKikimr::NPQ::TRlContext RlContext;

bool Authenticated() { return AuthenticationStep == SUCCESS; }
bool Authenticated() {
return !RequireAuthentication || AuthenticationStep == SUCCESS;
}
};

template<std::derived_from<TApiMessage> T>
Expand Down Expand Up @@ -81,10 +84,6 @@ class TMessagePtr {
T* Ptr;
};

inline bool RequireAuthentication(EApiKey apiKey) {
return !(EApiKey::API_VERSIONS == apiKey || EApiKey::SASL_HANDSHAKE == apiKey || EApiKey::SASL_AUTHENTICATE == apiKey);
}

inline EKafkaErrors ConvertErrorCode(Ydb::StatusIds::StatusCode status) {
switch (status) {
case Ydb::StatusIds::SUCCESS:
Expand Down Expand Up @@ -127,6 +126,8 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code)
switch (code) {
case Ydb::PersQueue::ErrorCode::ErrorCode::OK:
return EKafkaErrors::NONE_ERROR;
case Ydb::PersQueue::ErrorCode::ErrorCode::UNKNOWN_READ_RULE:
return EKafkaErrors::GROUP_ID_NOT_FOUND;
case Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST:
return EKafkaErrors::INVALID_REQUEST;
case Ydb::PersQueue::ErrorCode::ErrorCode::ERROR:
Expand Down Expand Up @@ -156,6 +157,14 @@ inline TString GetTopicNameWithoutDb(const TString& database, TString topic) {
return topic;
}

inline TString GetUsernameOrAnonymous(std::shared_ptr<TContext> context) {
return context->RequireAuthentication ? context->UserToken->GetUserSID() : "anonymous";
}

inline TString GetUserSerializedToken(std::shared_ptr<TContext> context) {
return context->RequireAuthentication ? context->UserToken->GetSerializedToken() : "";
}

NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TApiVersionsRequestData>& message);
NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TInitProducerIdRequestData>& message);
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kafka_proxy/actors/control_plane_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
}

const TString& GetSerializedToken() const override {
return UserToken->GetSerializedToken();
static TString emptyString;
return UserToken == nullptr ? emptyString : UserToken->GetSerializedToken();
}

bool IsClientLost() const override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
PrepareFetchRequestData(topicIndex, partPQRequests);
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, 0, ruPerRequest);
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, Context->UserToken, 0, ruPerRequest);
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
auto actorId = ctx.Register(fetchActor);
PendingResponses++;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ void TKafkaListOffsetsActor::HandleMissingTopicName(const TListOffsetsRequestDat
}

TActorId TKafkaListOffsetsActor::SendOffsetsRequest(const TListOffsetsRequestData::TListOffsetsTopic& topic, const NActors::TActorContext&) {
KAFKA_LOG_D("ListOffsets actor: Get offsets for topic '" << topic.Name << "' for user '" << Context->UserToken->GetUserSID() << "'");
KAFKA_LOG_D("ListOffsets actor: Get offsets for topic '" << topic.Name << "' for user " << GetUsernameOrAnonymous(Context));

TEvKafka::TGetOffsetsRequest offsetsRequest;
offsetsRequest.Topic = NormalizePath(Context->DatabasePath, topic.Name.value());
offsetsRequest.Token = Context->UserToken->GetSerializedToken();
offsetsRequest.Token = GetUserSerializedToken(Context);
offsetsRequest.Database = Context->DatabasePath;

for (const auto& partitionRequest: topic.Partitions) {
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->Topics.resize(Message->Topics.size());
Response->ClusterId = "ydb-cluster";
Response->ControllerId = 1;

Response->ControllerId = Context->Config.HasProxy() ? ProxyNodeId : ctx.SelfID.NodeId();

if (WithProxy) {
AddProxyNodeToBrokers();
Expand Down Expand Up @@ -88,11 +87,11 @@ void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesI
}

TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'");
KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user " << GetUsernameOrAnonymous(Context));

TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = NormalizePath(Context->DatabasePath, topicRequest.Name.value());
locationRequest.Token = Context->UserToken->GetSerializedToken();
locationRequest.Token = GetUserSerializedToken(Context);
locationRequest.Database = Context->DatabasePath;

PendingResponses++;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()

void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
// If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later
KAFKA_LOG_D("TopicOffsetActor: new request for user '" << Context->UserToken->GetUserSID()<< "'");
KAFKA_LOG_D("TopicOffsetActor: new request for user " << GetUsernameOrAnonymous(Context));
if (Message->Groups.empty()) {
TOffsetFetchRequestData::TOffsetFetchRequestGroup group;
group.GroupId = Message->GroupId.value();
Expand All @@ -239,15 +239,15 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
NKikimr::NGRpcProxy::V1::TLocalRequestBase locationRequest{
NormalizePath(Context->DatabasePath, topicToEntities.first),
Context->DatabasePath,
Context->UserToken->GetSerializedToken(),
GetUserSerializedToken(Context),
};
ctx.Register(new TTopicOffsetActor(
topicToEntities.second.Consumers,
locationRequest,
SelfId(),
topicToEntities.second.Partitions,
topicToEntities.first,
Context->UserToken->GetUserSID()
GetUsernameOrAnonymous(Context)
));
InflyTopics++;
}
Expand Down Expand Up @@ -276,7 +276,7 @@ void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr&

if (InflyTopics == 0) {
auto response = GetOffsetFetchResponse();
KAFKA_LOG_D("TopicOffsetActor: sending response to user '" << Context->UserToken->GetUserSID()<< "'");
KAFKA_LOG_D("TopicOffsetActor: sending response to user " << GetUsernameOrAnonymous(Context));
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu

topic.MeteringMode = info.PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode();

if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) {
if (!Context->RequireAuthentication || info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) {
topic.Status = OK;
topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL;
topic.PartitionChooser = CreatePartitionChooser(info.PQGroupInfo->Description);
Expand Down
16 changes: 1 addition & 15 deletions ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,6 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
}
UserToken = ev->Get()->Token;

if (ClientAuthData.UserName.Empty()) {
bool gotPermission = false;
for (auto & sid : UserToken->GetGroupSIDs()) {
if (sid == NKikimr::NGRpcProxy::V1::KafkaPlainAuthSid) {
gotPermission = true;
break;
}
}
if (!gotPermission) {
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "no permission '" << NKikimr::NGRpcProxy::V1::KafkaPlainAuthPermission << "'", ctx);
return;
}
}

SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx);
}

Expand Down Expand Up @@ -174,7 +160,7 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa
}

void TKafkaSaslAuthActor::SendApiKeyRequest() {
auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId, true);
auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId);

Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({
.Database = DatabasePath,
Expand Down
23 changes: 16 additions & 7 deletions ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/raw_socket/sock_config.h>
#include <ydb/core/util/address_classifier.h>

Expand All @@ -7,12 +8,6 @@
#include "kafka_events.h"
#include "kafka_log_impl.h"
#include "kafka_metrics.h"
#include "actors/kafka_read_session_actor.h"


#include <strstream>
#include <sstream>
#include <iosfwd>

namespace NKafka {

Expand Down Expand Up @@ -103,6 +98,11 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet

void Bootstrap() {
Context->ConnectionId = SelfId();
Context->RequireAuthentication = NKikimr::AppData()->EnforceUserTokenRequirement;
// if no authentication required, then we can use local database as our target
if (!Context->RequireAuthentication) {
Context->DatabasePath = NKikimr::AppData()->TenantName;
}

Become(&TKafkaConnection::StateAccepting);
Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
Expand Down Expand Up @@ -339,7 +339,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") {
Context->KafkaClient = Request->Header.ClientId.value();
}

switch (Request->Header.RequestApiKey) {
case PRODUCE:
HandleMessage(&Request->Header, Cast<TProduceRequestData>(Request), ctx);
Expand Down Expand Up @@ -449,6 +449,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
return;
}

Context->RequireAuthentication = NKikimr::AppData()->EnforceUserTokenRequirement;
Context->UserToken = event->UserToken;
Context->DatabasePath = event->DatabasePath;
Context->AuthenticationStep = authStep;
Expand Down Expand Up @@ -626,10 +627,12 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
case INFLIGTH_CHECK:
if (!Context->Authenticated() && !PendingRequestsQueue.empty()) {
// Allow only one message to be processed at a time for non-authenticated users
KAFKA_LOG_ERROR("DoRead: failed inflight check: there are " << PendingRequestsQueue.size() << " pending requests and user is not authnicated. Only one paraller request is allowed for a non-authenticated user.");
return true;
}
if (InflightSize + Request->ExpectedSize > Context->Config.GetMaxInflightSize()) {
// We limit the size of processed messages so as not to exceed the size of available memory
KAFKA_LOG_ERROR("DoRead: failed inflight check: InflightSize + Request->ExpectedSize=" << InflightSize + Request->ExpectedSize << " > Context->Config.GetMaxInflightSize=" << Context->Config.GetMaxInflightSize());
return true;
}
InflightSize += Request->ExpectedSize;
Expand Down Expand Up @@ -711,6 +714,12 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
}
}

bool RequireAuthentication(EApiKey apiKey) {
return !(EApiKey::API_VERSIONS == apiKey ||
EApiKey::SASL_HANDSHAKE == apiKey ||
EApiKey::SASL_AUTHENTICATE == apiKey);
}

void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {
if (event->Get()->Read) {
if (!CloseConnection) {
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kafka_proxy/kafka_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#pragma once

#include "kafka_messages_int.h"

namespace NKafka {

enum EListenerType {
Expand Down
Loading
Loading