@@ -23,23 +23,29 @@ using namespace Aws::Client;
2323using namespace Aws ::S3;
2424using namespace Aws ;
2525
26+ // Downloads scheme-related objects from S3
2627class TSchemeGetter : public TActorBootstrapped <TSchemeGetter> {
2728 static TString SchemeKeyFromSettings (const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
2829 Y_ABORT_UNLESS (itemIdx < (ui32)settings.items_size ());
2930 return TStringBuilder () << settings.items (itemIdx).source_prefix () << " /scheme.pb" ;
3031 }
3132
33+ static TString PermissionsKeyFromSettings (const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
34+ Y_ABORT_UNLESS (itemIdx < (ui32)settings.items_size ());
35+ return TStringBuilder () << settings.items (itemIdx).source_prefix () << " /permissions.pb" ;
36+ }
37+
3238 void HeadObject (const TString& key) {
3339 auto request = Model::HeadObjectRequest ()
3440 .WithKey (key);
3541
3642 Send (Client, new TEvExternalStorage::TEvHeadObjectRequest (request));
3743 }
3844
39- void Handle (TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
45+ void HandleScheme (TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
4046 const auto & result = ev->Get ()->Result ;
4147
42- LOG_D (" Handle TEvExternalStorage::TEvHeadObjectResponse"
48+ LOG_D (" HandleScheme TEvExternalStorage::TEvHeadObjectResponse"
4349 << " : self# " << SelfId ()
4450 << " , result# " << result);
4551
@@ -51,6 +57,25 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5157 GetObject (SchemeKey, std::make_pair (0 , contentLength - 1 ));
5258 }
5359
60+ void HandlePermissions (TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
61+ const auto & result = ev->Get ()->Result ;
62+
63+ LOG_D (" HandlePermissions TEvExternalStorage::TEvHeadObjectResponse"
64+ << " : self# " << SelfId ()
65+ << " , result# " << result);
66+
67+ if (result.GetError ().GetErrorType () == S3Errors::RESOURCE_NOT_FOUND
68+ || result.GetError ().GetErrorType () == S3Errors::NO_SUCH_KEY) {
69+ Reply (); // permissions are optional
70+ return ;
71+ } else if (!CheckResult (result, " HeadObject" )) {
72+ return ;
73+ }
74+
75+ const auto contentLength = result.GetResult ().GetContentLength ();
76+ GetObject (PermissionsKey, std::make_pair (0 , contentLength - 1 ));
77+ }
78+
5479 void GetObject (const TString& key, const std::pair<ui64, ui64>& range) {
5580 auto request = Model::GetObjectRequest ()
5681 .WithKey (key)
@@ -59,11 +84,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5984 Send (Client, new TEvExternalStorage::TEvGetObjectRequest (request));
6085 }
6186
62- void Handle (TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
87+ void HandleScheme (TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
6388 const auto & msg = *ev->Get ();
6489 const auto & result = msg.Result ;
6590
66- LOG_D (" Handle TEvExternalStorage::TEvGetObjectResponse"
91+ LOG_D (" HandleScheme TEvExternalStorage::TEvGetObjectResponse"
6792 << " : self# " << SelfId ()
6893 << " , result# " << result);
6994
@@ -74,14 +99,46 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
7499 Y_ABORT_UNLESS (ItemIdx < ImportInfo->Items .size ());
75100 auto & item = ImportInfo->Items .at (ItemIdx);
76101
77- LOG_T (" Trying to parse"
102+ LOG_T (" Trying to parse scheme "
78103 << " : self# " << SelfId ()
79104 << " , body# " << SubstGlobalCopy (msg.Body , " \n " , " \\ n" ));
80105
81106 if (!google::protobuf::TextFormat::ParseFromString (msg.Body , &item.Scheme )) {
82107 return Reply (false , " Cannot parse scheme" );
83108 }
84109
110+ if (NeedDownloadPermissions) {
111+ StartDownloadingPermissions ();
112+ } else {
113+ Reply ();
114+ }
115+ }
116+
117+ void HandlePermissions (TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
118+ const auto & msg = *ev->Get ();
119+ const auto & result = msg.Result ;
120+
121+ LOG_D (" HandlePermissions TEvExternalStorage::TEvGetObjectResponse"
122+ << " : self# " << SelfId ()
123+ << " , result# " << result);
124+
125+ if (!CheckResult (result, " GetObject" )) {
126+ return ;
127+ }
128+
129+ Y_ABORT_UNLESS (ItemIdx < ImportInfo->Items .size ());
130+ auto & item = ImportInfo->Items .at (ItemIdx);
131+
132+ LOG_T (" Trying to parse permissions"
133+ << " : self# " << SelfId ()
134+ << " , body# " << SubstGlobalCopy (msg.Body , " \n " , " \\ n" ));
135+
136+ Ydb::Scheme::ModifyPermissionsRequest permissions;
137+ if (!google::protobuf::TextFormat::ParseFromString (msg.Body , &permissions)) {
138+ return Reply (false , " Cannot parse permissions" );
139+ }
140+ item.Permissions = std::move (permissions);
141+
85142 Reply ();
86143 }
87144
@@ -123,33 +180,67 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
123180 TActor::PassAway ();
124181 }
125182
183+ void Download (const TString& key) {
184+ if (Client) {
185+ Send (Client, new TEvents::TEvPoisonPill ());
186+ }
187+ Client = RegisterWithSameMailbox (CreateS3Wrapper (ExternalStorageConfig->ConstructStorageOperator ()));
188+
189+ HeadObject (key);
190+ }
191+
192+ void DownloadScheme () {
193+ Download (SchemeKey);
194+ }
195+
196+ void DownloadPermissions () {
197+ Download (PermissionsKey);
198+ }
199+
200+ void ResetRetries () {
201+ Attempt = 0 ;
202+ }
203+
204+ void StartDownloadingPermissions () {
205+ ResetRetries ();
206+ DownloadPermissions ();
207+ Become (&TThis::StateDownloadPermissions);
208+ }
209+
126210public:
127211 explicit TSchemeGetter (const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx)
128212 : ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings))
129213 , ReplyTo(replyTo)
130214 , ImportInfo(importInfo)
131215 , ItemIdx(itemIdx)
132216 , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx))
217+ , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx))
133218 , Retries(importInfo->Settings.number_of_retries())
219+ , NeedDownloadPermissions(!importInfo->Settings.no_acl())
134220 {
135221 }
136222
137223 void Bootstrap () {
138- if (Client) {
139- Send (Client, new TEvents::TEvPoisonPill ());
140- }
141- Client = RegisterWithSameMailbox (CreateS3Wrapper (ExternalStorageConfig->ConstructStorageOperator ()));
224+ DownloadScheme ();
225+ Become (&TThis::StateDownloadScheme);
226+ }
227+
228+ STATEFN (StateDownloadScheme) {
229+ switch (ev->GetTypeRewrite ()) {
230+ hFunc (TEvExternalStorage::TEvHeadObjectResponse, HandleScheme);
231+ hFunc (TEvExternalStorage::TEvGetObjectResponse, HandleScheme);
142232
143- HeadObject (SchemeKey);
144- Become (&TThis::StateWork);
233+ sFunc (TEvents::TEvWakeup, DownloadScheme);
234+ sFunc (TEvents::TEvPoisonPill, PassAway);
235+ }
145236 }
146237
147- STATEFN (StateWork ) {
238+ STATEFN (StateDownloadPermissions ) {
148239 switch (ev->GetTypeRewrite ()) {
149- hFunc (TEvExternalStorage::TEvHeadObjectResponse, Handle );
150- hFunc (TEvExternalStorage::TEvGetObjectResponse, Handle );
240+ hFunc (TEvExternalStorage::TEvHeadObjectResponse, HandlePermissions );
241+ hFunc (TEvExternalStorage::TEvGetObjectResponse, HandlePermissions );
151242
152- sFunc (TEvents::TEvWakeup, Bootstrap );
243+ sFunc (TEvents::TEvWakeup, DownloadPermissions );
153244 sFunc (TEvents::TEvPoisonPill, PassAway);
154245 }
155246 }
@@ -161,13 +252,16 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
161252 const ui32 ItemIdx;
162253
163254 const TString SchemeKey;
255+ const TString PermissionsKey;
164256
165257 const ui32 Retries;
166258 ui32 Attempt = 0 ;
167259
168260 TDuration Delay = TDuration::Minutes(1 );
169261 static constexpr TDuration MaxDelay = TDuration::Minutes(10 );
170262
263+ const bool NeedDownloadPermissions = true ;
264+
171265 TActorId Client;
172266
173267}; // TSchemeGetter
0 commit comments