12
12
#include < ydb/core/wrappers/s3_storage_config.h>
13
13
#include < ydb/core/wrappers/s3_wrapper.h>
14
14
#include < ydb/core/wrappers/events/common.h>
15
+ #include < ydb/core/ydb_convert/table_description.h>
16
+ #include < ydb/core/ydb_convert/topic_description.h>
15
17
#include < ydb/library/actors/core/actor_bootstrapped.h>
16
18
#include < ydb/library/actors/core/hfunc.h>
17
19
#include < ydb/library/actors/http/http_proxy.h>
@@ -163,6 +165,8 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
163
165
UploadPermissions ();
164
166
} else if (!SchemeUploaded) {
165
167
UploadScheme ();
168
+ } else if (!ChangefeedsUploaded) {
169
+ UploadChangefeeds ();
166
170
} else {
167
171
this ->Become (&TThis::StateUploadData);
168
172
@@ -212,6 +216,60 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
212
216
this ->Become (&TThis::StateUploadPermissions);
213
217
}
214
218
219
+ void PutChangefeedDescription (const Ydb::Table::ChangefeedDescription& changefeed, const TString& changefeedKeyPattern, const ui64 index) {
220
+ google::protobuf::TextFormat::PrintToString (changefeed, &Buffer);
221
+ auto request = Aws::S3::Model::PutObjectRequest ()
222
+ .WithKey (Settings.GetChangefeedKey (changefeedKeyPattern));
223
+ this ->Send (Client, new TEvExternalStorage::TEvPutObjectRequest (request, std::move (Buffer)));
224
+ this ->Become (TThis::StateUploadOneChangefeed (index));
225
+ }
226
+
227
+ void PutTopicDescription (const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedKeyPattern, const ui64 index) {
228
+ google::protobuf::TextFormat::PrintToString (topic, &Buffer);
229
+ auto request = Aws::S3::Model::PutObjectRequest ()
230
+ .WithKey (Settings.GetTopicKey (changefeedKeyPattern));
231
+ this ->Send (Client, new TEvExternalStorage::TEvPutObjectRequest (request, std::move (Buffer)));
232
+ this ->Become (TThis::StateUploadOneTopic (index));
233
+ }
234
+
235
+ void UploadOneChangefeed (ui64 index) {
236
+ if (index >= ChangefeedsExportDescs.size ()) {
237
+ // Error!
238
+ return ;
239
+ }
240
+ if (index == ChangefeedsExportDescs.size ()) {
241
+ ChangefeedsUploaded = true ;
242
+ this ->Become (&TThis::StateUploadData);
243
+ return ;
244
+ }
245
+ const auto & changefeed = ChangefeedsExportDescs[index].ChangefeedDescription ;
246
+ const auto changefeedKeyPattern = TStringBuilder () << Settings.ObjectKeyPattern << " /" << changefeed.Getname ();
247
+ PutChangefeedDescription (changefeed, changefeedKeyPattern, index);
248
+ }
249
+
250
+ void UploadOneTopic (ui64 index) {
251
+ if (index >= ChangefeedsExportDescs.size ()) {
252
+ // Error!
253
+ return ;
254
+ }
255
+ auto & descs = ChangefeedsExportDescs[index];
256
+ const auto changefeedKeyPattern = TStringBuilder () << Settings.ObjectKeyPattern << " /"
257
+ << descs.ChangefeedDescription .Getname ();
258
+ PutTopicDescription (descs.Topic , changefeedKeyPattern, index);
259
+ }
260
+
261
+ void UploadChangefeeds () {
262
+ Y_ABORT_UNLESS (!ChangefeedsUploaded);
263
+
264
+ for (const auto &[changefeed, topic] : ChangefeedsExportDescs) {
265
+ google::protobuf::TextFormat::PrintToString (changefeed, &Buffer);
266
+ const auto changefeedKeyPattern = TStringBuilder () << Settings.ObjectKeyPattern << " /" << changefeed.Getname ();
267
+ PutChangefeedDescription (changefeed, changefeedKeyPattern);
268
+ PutTopicDescription (topic, changefeedKeyPattern);
269
+ }
270
+ this ->Become (&TThis::StateUploadScheme);
271
+ }
272
+
215
273
void UploadMetadata () {
216
274
Y_ABORT_UNLESS (!MetadataUploaded);
217
275
@@ -258,7 +316,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
258
316
if (Scanner) {
259
317
this ->Send (Scanner, new TEvExportScan::TEvFeed ());
260
318
}
261
- this -> Become (&TThis::StateUploadData );
319
+ UploadOneChangefeed ( 0 );
262
320
};
263
321
264
322
if (EnableChecksums) {
@@ -293,6 +351,44 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
293
351
}
294
352
}
295
353
354
+ auto HandleOneChangefeed (ui64 index) {
355
+ return [index, this ](TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
356
+ const auto & result = ev->Get ()->Result ;
357
+
358
+ EXPORT_LOG_D (" HandleMetadata TEvExternalStorage::TEvPutObjectResponse"
359
+ << " : self# " << this ->SelfId ()
360
+ << " , result# " << result);
361
+
362
+ if (!CheckResult (result, TStringBuf (" PutObject (changefeed)" ))) {
363
+ return ;
364
+ }
365
+
366
+ auto nextStep = [index, this ]() {
367
+ UploadOneTopic (index);
368
+ };
369
+ nextStep ();
370
+ };
371
+ }
372
+
373
+ auto HandleOneTopic (ui64 index) {
374
+ return [index, this ](TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
375
+ const auto & result = ev->Get ()->Result ;
376
+
377
+ EXPORT_LOG_D (" HandleMetadata TEvExternalStorage::TEvPutObjectResponse"
378
+ << " : self# " << this ->SelfId ()
379
+ << " , result# " << result);
380
+
381
+ if (!CheckResult (result, TStringBuf (" PutObject (topic)" ))) {
382
+ return ;
383
+ }
384
+
385
+ auto nextStep = [index, this ]() {
386
+ UploadOneChangefeed (index + 1 );
387
+ };
388
+ nextStep ();
389
+ };
390
+ }
391
+
296
392
void HandleMetadata (TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
297
393
const auto & result = ev->Get ()->Result ;
298
394
@@ -659,6 +755,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
659
755
const TActorId& dataShard, ui64 txId,
660
756
const NKikimrSchemeOp::TBackupTask& task,
661
757
TMaybe<Ydb::Table::CreateTableRequest>&& scheme,
758
+ TVector<TChangefeedExportDescriptions> changefeedsExportDescs,
662
759
TMaybe<Ydb::Scheme::ModifyPermissionsRequest>&& permissions,
663
760
TString&& metadata)
664
761
: ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings()))
@@ -670,6 +767,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
670
767
, DataShard(dataShard)
671
768
, TxId(txId)
672
769
, Scheme(std::move(scheme))
770
+ , ChangefeedsExportDescs(std::move(changefeedsExportDescs))
673
771
, Metadata(std::move(metadata))
674
772
, Permissions(std::move(permissions))
675
773
, Retries(task.GetNumberOfRetries())
@@ -728,6 +826,26 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
728
826
}
729
827
}
730
828
829
+ std::function<void (TAutoPtr<::NActors::IEventHandle>&)> StateUploadOneChangefeed(ui64 index) {
830
+ return [index, this ](TAutoPtr<::NActors ::IEventHandle> &ev) {
831
+ switch (ev->GetTypeRewrite ()) {
832
+ hFunc (TEvExternalStorage::TEvPutObjectResponse, HandleOneChangefeed (index));
833
+ default :
834
+ return StateBase (ev);
835
+ }
836
+ };
837
+ }
838
+
839
+ std::function<void (TAutoPtr<::NActors::IEventHandle>&)> StateUploadOneTopic (ui64 index) {
840
+ return [index, this ](TAutoPtr<::NActors ::IEventHandle> &ev) {
841
+ switch (ev->GetTypeRewrite ()) {
842
+ hFunc (TEvExternalStorage::TEvPutObjectResponse, HandleOneTopic (index));
843
+ default :
844
+ return StateBase (ev);
845
+ }
846
+ };
847
+ }
848
+
731
849
STATEFN (StateUploadMetadata) {
732
850
switch (ev->GetTypeRewrite ()) {
733
851
hFunc (TEvExternalStorage::TEvPutObjectResponse, HandleMetadata);
@@ -773,6 +891,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
773
891
const TActorId DataShard;
774
892
const ui64 TxId;
775
893
const TMaybe<Ydb::Table::CreateTableRequest> Scheme;
894
+ TVector<TChangefeedExportDescriptions> ChangefeedsExportDescs;
776
895
const TString Metadata;
777
896
const TMaybe<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
778
897
@@ -784,6 +903,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
784
903
785
904
TActorId Client;
786
905
bool SchemeUploaded;
906
+ bool ChangefeedsUploaded;
787
907
bool MetadataUploaded;
788
908
bool PermissionsUploaded;
789
909
bool MultiPart;
@@ -810,6 +930,17 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
810
930
? GenYdbScheme (Columns, Task.GetTable ())
811
931
: Nothing ();
812
932
933
+ const auto & persQueuesTPathDesc = Task.GetPersQueue ();
934
+
935
+ const auto & cdcStreams = Task.GetTable ().GetTable ().GetCdcStreams ();
936
+ const int changefeedsCount = cdcStreams.size ();
937
+ TVector <TChangefeedExportDescriptions> changefeedsExportDescs (changefeedsCount);
938
+
939
+ for (int i = 0 ; i < changefeedsCount; ++i) {
940
+ FillChangefeedDescription (changefeedsExportDescs[i].ChangefeedDescription , cdcStreams[i]);
941
+ FillTopicDescription (changefeedsExportDescs[i].Topic , persQueuesTPathDesc[i].GetPersQueueGroup ());
942
+ }
943
+
813
944
auto permissions = (Task.GetShardNum () == 0 )
814
945
? GenYdbPermissions (Task.GetTable ())
815
946
: Nothing ();
@@ -825,7 +956,7 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
825
956
metadata.AddFullBackup (backup);
826
957
827
958
return new TS3Uploader (
828
- dataShard, txId, Task, std::move (scheme), std::move (permissions), metadata.Serialize ());
959
+ dataShard, txId, Task, std::move (scheme), std::move (changefeedsExportDescs), std::move ( permissions), metadata.Serialize ());
829
960
}
830
961
831
962
} // NDataShard
0 commit comments