Skip to content

refactor CreatePartitions endpoint of Kafka API to use TUpdateSchemeActor #1005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 47 additions & 100 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback)
const std::function<void(EKafkaErrors, const TString&)> sendResultCallback)
: UserToken(userToken)
, TopicPath(topicPath)
, DatabaseName(databaseName)
Expand Down Expand Up @@ -71,7 +71,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
};

void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
Expand Down Expand Up @@ -101,8 +101,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
}

void RaiseIssue(const NYql::TIssue& issue) override{
ReplyMessage = issue.GetMessage();
Y_UNUSED(issue);
Issue = issue;
}

void RaiseIssues(const NYql::TIssues& issues) override {
Expand Down Expand Up @@ -174,7 +173,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt

void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
Y_UNUSED(result);
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void SendResult(
Expand All @@ -184,15 +183,15 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt

Y_UNUSED(result);
Y_UNUSED(message);
processYdbStatusCode(status, std::optional(std::ref(message)));
ProcessYdbStatusCode(status);
};

void SendResult(
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {

Y_UNUSED(message);
processYdbStatusCode(status, std::optional(std::ref(message)));
ProcessYdbStatusCode(status);
};

const Ydb::Operations::OperationParams& operation_params() const {
Expand All @@ -214,47 +213,16 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
const TString TopicPath;
const TString DatabaseName;
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback;
TString ReplyMessage;

void processYdbStatusCode(
Ydb::StatusIds::StatusCode& status,
std::optional<std::reference_wrapper<const google::protobuf::RepeatedPtrField<
NKikimr::NGRpcService::TYdbIssueMessageType>>> issueMessagesOpt = std::nullopt) {

switch (status) {
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SCHEME_ERROR:
if (issueMessagesOpt.has_value()) {
auto& issueMessages = issueMessagesOpt.value().get();
bool hasPathNotExists = std::find_if(
issueMessages.begin(),
issueMessages.end(),
[](const auto& msg){ return msg.issue_code() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST; }
) != issueMessages.end();

if (hasPathNotExists) {
SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, ReplyMessage);
} else {
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
} else {
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
break;
default:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
const std::function<void(const EKafkaErrors status, const TString& message)> SendResultCallback;
NYql::TIssue Issue;

void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
SendResultCallback(ConvertErrorCode(status), Issue.GetMessage());
}
};

class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest> {
using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>{
using TBase = NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
public:

TCreatePartitionsActor(
Expand All @@ -267,7 +235,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase
userToken,
topicPath,
databaseName,
[this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
[this](const EKafkaErrors status, const TString& message) {
this->SendResult(status, message);
})
)
Expand All @@ -283,7 +251,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase

~TCreatePartitionsActor() = default;

void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
void SendResult(const EKafkaErrors status, const TString& message) {
THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse());
response->Status = status;
response->TopicPath = TopicPath;
Expand All @@ -292,39 +260,25 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase
Send(SelfId(), new TEvents::TEvPoison());
}

void FillProposeRequest(
NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal,
const TActorContext &ctx,
const TString &workingDir,
const TString &name) {

void ModifyPersqueueConfig(
const TActorContext& ctx,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(ctx);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);

NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
modifyScheme.SetWorkingDir(workingDir);
modifyScheme.SetOperationType(::NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);

auto pqDescr = modifyScheme.MutableAlterPersQueueGroup();
(*pqDescr).SetTotalGroupCount(PartionsNumber);
(*pqDescr).SetName(name);
};
groupConfig.SetTotalGroupCount(PartionsNumber);
}

void Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
SendProposeRequest(ctx);
Become(&TCreatePartitionsActor::StateWork);
SendDescribeProposeRequest(ctx);
Become(&TBase::StateWork);
};

void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle);
default:
TBase::StateWork(ev);
}
}

void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }

private:
const TActorId Requester;
const TString TopicPath;
Expand Down Expand Up @@ -367,7 +321,7 @@ void TKafkaCreatePartitionsActor::Bootstrap(const NActors::TActorContext& ctx) {

if (topicName == "") {
auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST;
result->Status = INVALID_REQUEST;
result->Message = "Empty topic name";
this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release());
continue;
Expand Down Expand Up @@ -407,40 +361,33 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) {
for (auto& requestTopic : Message->Topics) {
auto topicName = requestTopic.Name.value();

if (!TopicNamesToResponses.contains(topicName)) {
continue;
}

TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic;
responseTopic.Name = topicName;
responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;

if (TopicNamesToResponses.contains(topicName)) {
responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;
}

auto setError= [&responseTopic, &responseStatus](EKafkaErrors status) {
responseTopic.ErrorCode = status;
EKafkaErrors status = TopicNamesToResponses[topicName]->Status;
responseTopic.ErrorCode = status;
if(status != NONE_ERROR) {
responseStatus = status;
};
}

if (DuplicateTopicNames.contains(topicName)) {
setError(DUPLICATE_RESOURCE);
} else {
switch (TopicNamesToResponses[topicName]->Status) {
case TEvKafka::TEvTopicModificationResponse::OK:
responseTopic.ErrorCode = NONE_ERROR;
break;
case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST:
case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST:
setError(INVALID_REQUEST);
break;
case TEvKafka::TEvTopicModificationResponse::ERROR:
setError(UNKNOWN_SERVER_ERROR);
break;
case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG:
setError(INVALID_CONFIG);
break;
}
}
response->Results.push_back(responseTopic);
}

for (auto& topicName : DuplicateTopicNames) {
TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic;
responseTopic.Name = topicName;
responseTopic.ErrorMessage = "Duplicate topic in request.";
responseTopic.ErrorCode = INVALID_REQUEST;

response->Results.push_back(responseTopic);

responseStatus = INVALID_REQUEST;
}
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));

Die(ctx);
Expand Down
59 changes: 18 additions & 41 deletions ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback)
const std::function<void(EKafkaErrors, TString&)> sendResultCallback)
: UserToken(userToken)
, TopicPath(topicPath)
, DatabaseName(databaseName)
Expand Down Expand Up @@ -71,7 +71,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {
};

void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
Expand Down Expand Up @@ -171,7 +171,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {

void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
Y_UNUSED(result);
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void SendResult(
Expand All @@ -181,15 +181,15 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {

Y_UNUSED(result);
Y_UNUSED(message);
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void SendResult(
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {

Y_UNUSED(message);
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

const Ydb::Operations::OperationParams& operation_params() const {
Expand All @@ -211,20 +211,11 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
const TString TopicPath;
const TString DatabaseName;
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback;
const std::function<void(EKafkaErrors status, TString& message)> SendResultCallback;
TString ReplyMessage;

void processYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
switch (status) {
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage);
break;
default:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
SendResultCallback(ConvertErrorCode(status), ReplyMessage);
}
};

Expand All @@ -244,7 +235,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
userToken,
topicPath,
databaseName,
[this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
[this](EKafkaErrors status, TString& message) {
this->SendResult(status, message);
})
)
Expand All @@ -259,7 +250,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre

~TCreateTopicActor() = default;

void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
void SendResult(EKafkaErrors status, TString& message) {
THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse());
response->Status = status;
response->TopicPath = TopicPath;
Expand Down Expand Up @@ -378,7 +369,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {

if (topicName == "") {
auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST;
result->Status = EKafkaErrors::INVALID_REQUEST;
result->Message = "Empty topic name";
this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release());
continue;
Expand All @@ -395,7 +386,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
return true;
} catch(std::invalid_argument) {
auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::INVALID_CONFIG;
result->Status = EKafkaErrors::INVALID_CONFIG;
result->Message = "Provided retention value is not a number";
this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release());
return false;
Expand Down Expand Up @@ -489,26 +480,12 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
if (DuplicateTopicNames.contains(topicName)) {
setError(DUPLICATE_RESOURCE);
} else {
switch (TopicNamesToResponses[topicName]->Status) {
case TEvKafka::TEvTopicModificationResponse::OK:
responseTopic.ErrorCode = NONE_ERROR;
addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME);
addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME);
break;
case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST:
setError(INVALID_REQUEST);
break;
case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST:
KAFKA_LOG_ERROR("Create topics actor: Topic: [" << topicName << "]. Unexpected TOPIC_DOES_NOT_EXIST status received.");
setError(UNKNOWN_SERVER_ERROR);
break;
case TEvKafka::TEvTopicModificationResponse::ERROR:
setError(UNKNOWN_SERVER_ERROR);
break;
case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG:
setError(INVALID_CONFIG);
break;
}
auto status = TopicNamesToResponses[topicName]->Status;
if (status == EKafkaErrors::NONE_ERROR) {
addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME);
addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME);
}
setError(TopicNamesToResponses[topicName]->Status);
}
response->Topics.push_back(responseTopic);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/kafka_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModifi
{}

TString TopicPath;
EStatus Status;
EKafkaErrors Status;
TString Message;
};
};
Expand Down
Loading