Skip to content

Commit c53e299

Browse files
committed
Minor changes
1 parent 77b83fb commit c53e299

File tree

3 files changed

+44
-53
lines changed

3 files changed

+44
-53
lines changed

ydb/services/lib/actors/pq_schema_actor.h

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -128,52 +128,44 @@ namespace NKikimr::NGRpcProxy::V1 {
128128
return false;
129129
};
130130

131-
132131
void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate) {
133132
auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
134-
navigateRequest->DatabaseName = CanonizePath(Database);
135-
136-
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
137-
if (CheckAccessWithUpdateRowPermission) {
138-
entry.Access = NACLib::EAccessRights::UpdateRow;
139-
}
140-
entry.Path = NKikimr::SplitPath(GetTopicPath());
141-
entry.SyncVersion = true;
142-
entry.ShowPrivatePath = showPrivate;
143-
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
144-
navigateRequest->ResultSet.emplace_back(entry);
145-
146133
if (!SetRequestToken(navigateRequest.get())) {
147134
AddIssue(FillIssue("Unauthenticated access is forbidden, please provide credentials",
148135
Ydb::PersQueue::ErrorCode::ACCESS_DENIED));
149136
return RespondWithCode(Ydb::StatusIds::UNAUTHORIZED);
150137
}
138+
139+
navigateRequest->DatabaseName = CanonizePath(Database);
140+
navigateRequest->ResultSet.emplace_back(NSchemeCache::TSchemeCacheNavigate::TEntry{
141+
.Path = NKikimr::SplitPath(GetTopicPath()),
142+
.Access = CheckAccessWithUpdateRowPermission ? NACLib::UpdateRow : NACLib::DescribeSchema,
143+
.Operation = NSchemeCache::TSchemeCacheNavigate::OpList,
144+
.ShowPrivatePath = showPrivate,
145+
.SyncVersion = true,
146+
});
151147
if (!IsDead) {
152148
ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release()));
153149
}
154150
}
155151

156152
bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
157-
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
158-
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
159-
const auto& response = result->ResultSet.front();
160-
const TString path = JoinPath(response.Path);
161-
162-
if (ev->Get()->Request.Get()->ResultSet.size() != 1 ||
163-
ev->Get()->Request.Get()->ResultSet.begin()->Kind !=
164-
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
165-
AddIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic",
166-
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
167-
RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
168-
return true;
153+
auto const& entries = ev->Get()->Request.Get()->ResultSet;
154+
Y_ABORT_UNLESS(entries.size() == 1);
155+
auto const& entry = entries.front();
156+
if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindTopic) {
157+
return false;
169158
}
170-
return false;
159+
AddIssue(FillIssue(TStringBuilder() << "path '" << JoinPath(entry.Path) << "' is not a topic",
160+
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
161+
RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
162+
return true;
171163
}
172164

173165
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
174-
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
175-
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
176-
const auto& response = result->ResultSet.front();
166+
auto const& entries = ev->Get()->Request.Get()->ResultSet;
167+
Y_ABORT_UNLESS(entries.size() == 1);
168+
const auto& response = entries.front();
177169
const TString path = JoinPath(response.Path);
178170

179171
switch (response.Status) {
@@ -248,7 +240,6 @@ namespace NKikimr::NGRpcProxy::V1 {
248240
}
249241
}
250242

251-
252243
void StateWork(TAutoPtr<IEventHandle>& ev) {
253244
switch (ev->GetTypeRewrite()) {
254245
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
@@ -343,13 +334,13 @@ namespace NKikimr::NGRpcProxy::V1 {
343334
}
344335

345336
bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override {
346-
if (this->Request_->GetSerializedToken().empty()) {
347-
return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
348-
} else {
349-
request->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken());
337+
if (auto const& token = this->Request_->GetSerializedToken()) {
338+
request->UserToken = new NACLib::TUserToken(token);
350339
return true;
351340
}
341+
return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
352342
}
343+
353344
bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override {
354345
if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
355346
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {

ydb/services/persqueue_v1/actors/schema_actors.cpp

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,8 +1309,7 @@ TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::IRequest
13091309
{
13101310
}
13111311

1312-
void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx)
1313-
{
1312+
void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx) {
13141313
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "TDescribePartitionActor" << ctx.SelfID.ToString() << ": Bootstrap");
13151314
CheckAccessWithUpdateRowPermission = true;
13161315
TBase::Bootstrap(ctx);
@@ -1321,7 +1320,10 @@ void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx)
13211320
void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) {
13221321
switch (ev->GetTypeRewrite()) {
13231322
case TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType:
1324-
if (MaybeRequestWithDescribeSchema(ev)) {
1323+
if (NeedToRequestWithDescribeSchema(ev)) {
1324+
// We do not have the UpdateRow permission. Check if we're allowed to DescribeSchema.
1325+
CheckAccessWithUpdateRowPermission = false;
1326+
SendDescribeProposeRequest(ActorContext());
13251327
break;
13261328
}
13271329
[[fallthrough]];
@@ -1332,39 +1334,35 @@ void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) {
13321334
}
13331335
}
13341336

1335-
// Return true if the second request to SchemeCache has been sent,
1336-
// i.e. we had already sent the first request checking the UpdateRow permission,
1337-
// but the result was negative, so we try again, this time using the DescribeSchema permission.
1338-
bool TDescribePartitionActor::MaybeRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev) {
1339-
if (!std::exchange(CheckAccessWithUpdateRowPermission, false)) {
1340-
// We've got a response to our second request.
1337+
// Return true if we need to send a second request to SchemeCache with DescribeSchema permission,
1338+
// because the first request checking the UpdateRow permission resulted in an AccessDenied error.
1339+
bool TDescribePartitionActor::NeedToRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev) {
1340+
if (!CheckAccessWithUpdateRowPermission) {
1341+
// We've already sent a request with DescribeSchema, ev is a response to it.
13411342
return false;
13421343
}
13431344

13441345
auto evNav = *reinterpret_cast<typename TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
13451346
auto const& entries = evNav->Get()->Request.Get()->ResultSet;
13461347
Y_ABORT_UNLESS(entries.size() == 1);
13471348

1348-
if (entries.front().Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
1349-
// We do have access to the requested entity.
1349+
if (entries.front().Status != NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied) {
1350+
// We do have access to the requested entity or there was an error.
13501351
// Transfer ownership to the ev pointer, and let the base classes' StateWork methods handle the response.
13511352
ev = *reinterpret_cast<TAutoPtr<IEventHandle>*>(&evNav);
13521353
return false;
13531354
}
13541355

1355-
// We do not have the UpdateRow permission. Check if we're allowed to DescribeSchema.
1356-
SendDescribeProposeRequest(ActorContext());
13571356
return true;
13581357
}
13591358

1360-
void TDescribePartitionActor::HandleCacheNavigateResponse(
1361-
TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev
1362-
) {
1363-
Y_ABORT_UNLESS(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
1359+
void TDescribePartitionActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
1360+
auto const& entries = ev->Get()->Request.Get()->ResultSet;
1361+
Y_ABORT_UNLESS(entries.size() == 1); // describe for only one topic
13641362
if (ReplyIfNotTopic(ev)) {
13651363
return;
13661364
}
1367-
PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo;
1365+
PQGroupInfo = entries[0].PQGroupInfo;
13681366
auto* partRes = Result.mutable_partition();
13691367
partRes->set_partition_id(Settings.Partitions[0]);
13701368
partRes->set_active(true);

ydb/services/persqueue_v1/actors/schema_actors.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,9 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
264264

265265
virtual void Reply(const TActorContext& ctx) override;
266266

267-
bool MaybeRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev);
267+
private:
268+
269+
bool NeedToRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev);
268270

269271
private:
270272
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;

0 commit comments

Comments
 (0)