Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4379,6 +4379,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Scheme = scheme;
}

if (rowset.HaveValue<Schema::ImportItems::Permissions>()) {
Ydb::Scheme::ModifyPermissionsRequest permissions;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue<Schema::ImportItems::Permissions>()));
item.Permissions = permissions;
}

item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ TVector<ISubOperation::TPtr> CreateIndexedTable(TOperationId nextId, const TTxTr
if (tx.HasAlterUserAttributes()) {
scheme.MutableAlterUserAttributes()->CopyFrom(tx.GetAlterUserAttributes());
}
if (tx.HasModifyACL()) {
scheme.MutableModifyACL()->CopyFrom(tx.GetModifyACL());
}

result.push_back(CreateNewTable(NextPartId(nextId, result), scheme, sequences));
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Scheme>(item.Scheme.SerializeAsString())
);
if (item.Permissions.Defined()) {
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Permissions>(item.Permissions->SerializeAsString())
);
}
}

void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Expand Down
14 changes: 10 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/core/base/path.h>
#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/ydb_convert.h>

namespace NKikimr {
namespace NSchemeShard {
Expand All @@ -20,10 +21,6 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;

if (importInfo->UserSID) {
record.SetOwner(*importInfo->UserSID);
}

auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
modifyScheme.SetInternal(true);
Expand Down Expand Up @@ -66,6 +63,15 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
}
}

if (importInfo->UserSID) {
record.SetOwner(*importInfo->UserSID);
}
FillOwner(record, item.Permissions);

if (!FillACL(modifyScheme, item.Permissions, error)) {
return nullptr;
}

return propose;
}

Expand Down
124 changes: 109 additions & 15 deletions ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,29 @@ using namespace Aws::Client;
using namespace Aws::S3;
using namespace Aws;

// Downloads scheme-related objects from S3
class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/scheme.pb";
}

static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb";
}

void HeadObject(const TString& key) {
auto request = Model::HeadObjectRequest()
.WithKey(key);

Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
}

void Handle(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
void HandleScheme(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

LOG_D("Handle TEvExternalStorage::TEvHeadObjectResponse"
LOG_D("HandleScheme TEvExternalStorage::TEvHeadObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

Expand All @@ -51,6 +57,25 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
GetObject(SchemeKey, std::make_pair(0, contentLength - 1));
}

void HandlePermissions(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

LOG_D("HandlePermissions TEvExternalStorage::TEvHeadObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

if (result.GetError().GetErrorType() == S3Errors::RESOURCE_NOT_FOUND
|| result.GetError().GetErrorType() == S3Errors::NO_SUCH_KEY) {
Reply(); // permissions are optional
return;
} else if (!CheckResult(result, "HeadObject")) {
return;
}

const auto contentLength = result.GetResult().GetContentLength();
GetObject(PermissionsKey, std::make_pair(0, contentLength - 1));
}

void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
auto request = Model::GetObjectRequest()
.WithKey(key)
Expand All @@ -59,11 +84,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request));
}

void Handle(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
void HandleScheme(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
const auto& msg = *ev->Get();
const auto& result = msg.Result;

LOG_D("Handle TEvExternalStorage::TEvGetObjectResponse"
LOG_D("HandleScheme TEvExternalStorage::TEvGetObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

Expand All @@ -74,14 +99,46 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);

LOG_T("Trying to parse"
LOG_T("Trying to parse scheme"
<< ": self# " << SelfId()
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));

if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) {
return Reply(false, "Cannot parse scheme");
}

if (NeedDownloadPermissions) {
StartDownloadingPermissions();
} else {
Reply();
}
}

void HandlePermissions(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
const auto& msg = *ev->Get();
const auto& result = msg.Result;

LOG_D("HandlePermissions TEvExternalStorage::TEvGetObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

if (!CheckResult(result, "GetObject")) {
return;
}

Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);

LOG_T("Trying to parse permissions"
<< ": self# " << SelfId()
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));

Ydb::Scheme::ModifyPermissionsRequest permissions;
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &permissions)) {
return Reply(false, "Cannot parse permissions");
}
item.Permissions = std::move(permissions);

Reply();
}

Expand Down Expand Up @@ -123,33 +180,67 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
TActor::PassAway();
}

void Download(const TString& key) {
if (Client) {
Send(Client, new TEvents::TEvPoisonPill());
}
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));

HeadObject(key);
}

void DownloadScheme() {
Download(SchemeKey);
}

void DownloadPermissions() {
Download(PermissionsKey);
}

void ResetRetries() {
Attempt = 0;
}

void StartDownloadingPermissions() {
ResetRetries();
DownloadPermissions();
Become(&TThis::StateDownloadPermissions);
}

public:
explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx)
: ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings))
, ReplyTo(replyTo)
, ImportInfo(importInfo)
, ItemIdx(itemIdx)
, SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx))
, PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx))
, Retries(importInfo->Settings.number_of_retries())
, NeedDownloadPermissions(!importInfo->Settings.no_acl())
{
}

void Bootstrap() {
if (Client) {
Send(Client, new TEvents::TEvPoisonPill());
}
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
DownloadScheme();
Become(&TThis::StateDownloadScheme);
}

STATEFN(StateDownloadScheme) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleScheme);
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleScheme);

HeadObject(SchemeKey);
Become(&TThis::StateWork);
sFunc(TEvents::TEvWakeup, DownloadScheme);
sFunc(TEvents::TEvPoisonPill, PassAway);
}
}

STATEFN(StateWork) {
STATEFN(StateDownloadPermissions) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvHeadObjectResponse, Handle);
hFunc(TEvExternalStorage::TEvGetObjectResponse, Handle);
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandlePermissions);
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandlePermissions);

sFunc(TEvents::TEvWakeup, Bootstrap);
sFunc(TEvents::TEvWakeup, DownloadPermissions);
sFunc(TEvents::TEvPoisonPill, PassAway);
}
}
Expand All @@ -161,13 +252,16 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
const ui32 ItemIdx;

const TString SchemeKey;
const TString PermissionsKey;

const ui32 Retries;
ui32 Attempt = 0;

TDuration Delay = TDuration::Minutes(1);
static constexpr TDuration MaxDelay = TDuration::Minutes(10);

const bool NeedDownloadPermissions = true;

TActorId Client;

}; // TSchemeGetter
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2769,6 +2769,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
TString DstPathName;
TPathId DstPathId;
Ydb::Table::CreateTableRequest Scheme;
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;

EState State = EState::GetScheme;
ESubState SubState = ESubState::AllocateTxId;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,7 @@ struct Schema : NIceDb::Schema {
struct DstPathOwnerId : Column<4, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; };
struct DstPathLocalId : Column<5, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; };
struct Scheme : Column<6, NScheme::NTypeIds::String> {};
struct Permissions : Column<11, NScheme::NTypeIds::String> {};

struct State : Column<7, NScheme::NTypeIds::Byte> {};
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };
Expand All @@ -1532,6 +1533,7 @@ struct Schema : NIceDb::Schema {
DstPathOwnerId,
DstPathLocalId,
Scheme,
Permissions,
State,
WaitTxId,
NextIndexIdx,
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1207,9 +1207,9 @@ TCheckFunc HasOwner(const TString& owner) {
};
}

void CheckEffectiveRight(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString& right, bool mustHave) {
void CheckRight(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString& right, bool mustHave, bool isEffective) {
const auto& self = record.GetPathDescription().GetSelf();
TSecurityObject src(self.GetOwner(), self.GetEffectiveACL(), false);
TSecurityObject src(self.GetOwner(), isEffective ? self.GetEffectiveACL() : self.GetACL(), false);

NACLib::TSecurityObject required;
required.FromString(right);
Expand All @@ -1233,6 +1233,22 @@ void CheckEffectiveRight(const NKikimrScheme::TEvDescribeSchemeResult& record, c
}
}

TCheckFunc HasRight(const TString& right) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
CheckRight(record, right, true, true);
};
}

TCheckFunc HasNotRight(const TString& right) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
CheckRight(record, right, false, true);
};
}

void CheckEffectiveRight(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString& right, bool mustHave) {
CheckRight(record, right, mustHave, true);
}

TCheckFunc HasEffectiveRight(const TString& right) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
CheckEffectiveRight(record, right, true);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ namespace NLs {
TCheckFunc BackupHistoryCount(ui64 count);

TCheckFunc HasOwner(const TString& owner);
TCheckFunc HasRight(const TString& right);
TCheckFunc HasNotRight(const TString& right);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HasNoRight?

TCheckFunc HasEffectiveRight(const TString& right);
TCheckFunc HasNotEffectiveRight(const TString& right);

Expand Down
Loading