13
13
#include < ydb/core/wrappers/s3_storage_config.h>
14
14
#include < ydb/core/wrappers/s3_wrapper.h>
15
15
#include < ydb/core/wrappers/events/common.h>
16
+ #include < ydb/core/ydb_convert/table_description.h>
17
+ #include < ydb/core/ydb_convert/topic_description.h>
16
18
#include < ydb/library/actors/core/actor_bootstrapped.h>
17
19
#include < ydb/library/actors/core/hfunc.h>
18
20
#include < ydb/library/actors/http/http_proxy.h>
@@ -35,6 +37,11 @@ namespace NDataShard {
35
37
using namespace NBackup ;
36
38
using namespace NBackupRestoreTraits ;
37
39
40
+ struct TChangefeedExportDescriptions {
41
+ const Ydb::Table::ChangefeedDescription ChangefeedDescription;
42
+ const Ydb::Topic::DescribeTopicResult Topic;
43
+ };
44
+
38
45
class TS3Uploader : public TActorBootstrapped <TS3Uploader> {
39
46
using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig;
40
47
using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig;
@@ -165,6 +172,8 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
165
172
UploadPermissions ();
166
173
} else if (!SchemeUploaded) {
167
174
UploadScheme ();
175
+ } else if (!ChangefeedsUploaded) {
176
+ UploadChangefeed ();
168
177
} else {
169
178
this ->Become (&TThis::StateUploadData);
170
179
@@ -214,6 +223,46 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
214
223
this ->Become (&TThis::StateUploadPermissions);
215
224
}
216
225
226
+ template <typename T>
227
+ void PutDescription (const google::protobuf::Message& desc, const TString& key, TString& checksum, T stateFunc) {
228
+ google::protobuf::TextFormat::PrintToString (desc, &Buffer);
229
+ if (EnableChecksums) {
230
+ checksum = ComputeChecksum (Buffer);
231
+ }
232
+ auto request = Aws::S3::Model::PutObjectRequest ()
233
+ .WithKey (key);
234
+ this ->Send (Client, new TEvExternalStorage::TEvPutObjectRequest (request, std::move (Buffer)));
235
+ this ->Become (stateFunc);
236
+ }
237
+
238
+ void PutChangefeedDescription (const Ydb::Table::ChangefeedDescription& changefeed, const TString& changefeedName) {
239
+ PutDescription (changefeed, Settings.GetChangefeedKey (changefeedName), ChangefeedChecksum, &TThis::StateUploadChangefeed);
240
+ }
241
+
242
+ void PutTopicDescription (const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedName) {
243
+ PutDescription (topic, Settings.GetTopicKey (changefeedName), TopicChecksum, &TThis::StateUploadTopic);
244
+ }
245
+
246
+ const TString& GetCurrentChangefeedName () const {
247
+ return Changefeeds.at (IndexExportedChangefeed).ChangefeedDescription .Getname ();
248
+ }
249
+
250
+ void UploadChangefeed () {
251
+ if (IndexExportedChangefeed == Changefeeds.size ()) {
252
+ ChangefeedsUploaded = true ;
253
+ if (Scanner) {
254
+ this ->Send (Scanner, new TEvExportScan::TEvFeed ());
255
+ }
256
+ this ->Become (&TThis::StateUploadData);
257
+ return ;
258
+ }
259
+ PutChangefeedDescription (Changefeeds[IndexExportedChangefeed].ChangefeedDescription , GetCurrentChangefeedName ());
260
+ }
261
+
262
+ void UploadTopic () {
263
+ PutTopicDescription (Changefeeds[IndexExportedChangefeed].Topic , GetCurrentChangefeedName ());
264
+ }
265
+
217
266
void UploadMetadata () {
218
267
Y_ABORT_UNLESS (!MetadataUploaded);
219
268
@@ -256,11 +305,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
256
305
257
306
auto nextStep = [this ]() {
258
307
SchemeUploaded = true ;
259
-
260
- if (Scanner) {
261
- this ->Send (Scanner, new TEvExportScan::TEvFeed ());
262
- }
263
- this ->Become (&TThis::StateUploadData);
308
+ UploadChangefeed ();
264
309
};
265
310
266
311
if (EnableChecksums) {
@@ -295,6 +340,51 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
295
340
}
296
341
}
297
342
343
+ void HandleChangefeed (TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
344
+ const auto & result = ev->Get ()->Result ;
345
+
346
+ EXPORT_LOG_D (" HandleChangefeed TEvExternalStorage::TEvPutObjectResponse"
347
+ << " : self# " << this ->SelfId ()
348
+ << " , result# " << result);
349
+
350
+ if (!CheckResult (result, TStringBuf (" PutObject (changefeed)" ))) {
351
+ return ;
352
+ }
353
+
354
+ auto nextStep = [this ]() {
355
+ UploadTopic ();
356
+ };
357
+ if (EnableChecksums) {
358
+ TString checksumKey = ChecksumKey (Settings.GetChangefeedKey (GetCurrentChangefeedName ()));
359
+ UploadChecksum (std::move (ChangefeedChecksum), checksumKey, ChangefeedKeySuffix (), nextStep);
360
+ } else {
361
+ nextStep ();
362
+ }
363
+ }
364
+
365
+ void HandleTopic (TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
366
+ const auto & result = ev->Get ()->Result ;
367
+
368
+ EXPORT_LOG_D (" HandleTopic TEvExternalStorage::TEvPutObjectResponse"
369
+ << " : self# " << this ->SelfId ()
370
+ << " , result# " << result);
371
+
372
+ if (!CheckResult (result, TStringBuf (" PutObject (topic)" ))) {
373
+ return ;
374
+ }
375
+
376
+ auto nextStep = [this ]() {
377
+ ++IndexExportedChangefeed;
378
+ UploadChangefeed ();
379
+ };
380
+ if (EnableChecksums) {
381
+ TString checksumKey = ChecksumKey (Settings.GetTopicKey (GetCurrentChangefeedName ()));
382
+ UploadChecksum (std::move (TopicChecksum), checksumKey, TopicKeySuffix (), nextStep);
383
+ } else {
384
+ nextStep ();
385
+ }
386
+ }
387
+
298
388
void HandleMetadata (TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
299
389
const auto & result = ev->Get ()->Result ;
300
390
@@ -344,7 +434,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
344
434
return PassAway ();
345
435
}
346
436
347
- if (ProxyResolved && SchemeUploaded && MetadataUploaded && PermissionsUploaded) {
437
+ if (ProxyResolved && SchemeUploaded && MetadataUploaded && PermissionsUploaded && ChangefeedsUploaded ) {
348
438
this ->Send (Scanner, new TEvExportScan::TEvFeed ());
349
439
}
350
440
}
@@ -661,6 +751,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
661
751
const TActorId& dataShard, ui64 txId,
662
752
const NKikimrSchemeOp::TBackupTask& task,
663
753
TMaybe<Ydb::Table::CreateTableRequest>&& scheme,
754
+ TVector<TChangefeedExportDescriptions> changefeeds,
664
755
TMaybe<Ydb::Scheme::ModifyPermissionsRequest>&& permissions,
665
756
TString&& metadata)
666
757
: ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings()))
@@ -672,12 +763,14 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
672
763
, DataShard(dataShard)
673
764
, TxId(txId)
674
765
, Scheme(std::move(scheme))
766
+ , Changefeeds(std::move(changefeeds))
675
767
, Metadata(std::move(metadata))
676
768
, Permissions(std::move(permissions))
677
769
, Retries(task.GetNumberOfRetries())
678
770
, Attempt(0 )
679
771
, Delay(TDuration::Minutes(1 ))
680
772
, SchemeUploaded(ShardNum == 0 ? false : true )
773
+ , ChangefeedsUploaded(ShardNum == 0 ? false : true )
681
774
, MetadataUploaded(ShardNum == 0 ? false : true )
682
775
, PermissionsUploaded(ShardNum == 0 ? false : true )
683
776
, EnableChecksums(task.GetEnableChecksums())
@@ -730,6 +823,22 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
730
823
}
731
824
}
732
825
826
+ STATEFN (StateUploadChangefeed) {
827
+ switch (ev->GetTypeRewrite ()) {
828
+ hFunc (TEvExternalStorage::TEvPutObjectResponse, HandleChangefeed);
829
+ default :
830
+ return StateBase (ev);
831
+ }
832
+ }
833
+
834
+ STATEFN (StateUploadTopic) {
835
+ switch (ev->GetTypeRewrite ()) {
836
+ hFunc (TEvExternalStorage::TEvPutObjectResponse, HandleTopic);
837
+ default :
838
+ return StateBase (ev);
839
+ }
840
+ }
841
+
733
842
STATEFN (StateUploadMetadata) {
734
843
switch (ev->GetTypeRewrite ()) {
735
844
hFunc (TEvExternalStorage::TEvPutObjectResponse, HandleMetadata);
@@ -775,17 +884,20 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
775
884
const TActorId DataShard;
776
885
const ui64 TxId;
777
886
const TMaybe<Ydb::Table::CreateTableRequest> Scheme;
887
+ const TVector<TChangefeedExportDescriptions> Changefeeds;
778
888
const TString Metadata;
779
889
const TMaybe<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
780
890
781
891
const ui32 Retries;
782
892
ui32 Attempt;
893
+ ui64 IndexExportedChangefeed = 0 ;
783
894
784
895
TDuration Delay;
785
896
static constexpr TDuration MaxDelay = TDuration::Minutes(10 );
786
897
787
898
TActorId Client;
788
899
bool SchemeUploaded;
900
+ bool ChangefeedsUploaded;
789
901
bool MetadataUploaded;
790
902
bool PermissionsUploaded;
791
903
bool MultiPart;
@@ -801,6 +913,8 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
801
913
bool EnableChecksums;
802
914
TString DataChecksum;
803
915
TString MetadataChecksum;
916
+ TString ChangefeedChecksum;
917
+ TString TopicChecksum;
804
918
TString SchemeChecksum;
805
919
TString PermissionsChecksum;
806
920
std::function<void ()> ChecksumUploadedCallback;
@@ -812,6 +926,28 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
812
926
? GenYdbScheme (Columns, Task.GetTable ())
813
927
: Nothing ();
814
928
929
+ const auto & persQueues = Task.GetChangefeedUnderlyingTopics ();
930
+ const auto & cdcStreams = Task.GetTable ().GetTable ().GetCdcStreams ();
931
+ Y_ASSERT (persQueues.size () == cdcStreams.size ());
932
+
933
+ const int changefeedsCount = cdcStreams.size ();
934
+ TVector <TChangefeedExportDescriptions> changefeeds;
935
+ changefeeds.reserve (changefeedsCount);
936
+
937
+ for (int i = 0 ; i < changefeedsCount; ++i) {
938
+ Ydb::Table::ChangefeedDescription changefeed;
939
+ const auto & cdcStream = cdcStreams.at (i);
940
+ FillChangefeedDescription (changefeed, cdcStream);
941
+
942
+ Ydb::Topic::DescribeTopicResult topic;
943
+ const auto & pq = persQueues.at (i);
944
+ Ydb::StatusIds::StatusCode status;
945
+ TString error;
946
+ FillTopicDescription (topic, pq.GetPersQueueGroup (), pq.GetSelf (), cdcStream.GetName (), status, error);
947
+
948
+ changefeeds.emplace_back (changefeed, topic);
949
+ }
950
+
815
951
auto permissions = (Task.GetShardNum () == 0 )
816
952
? GenYdbPermissions (Task.GetTable ())
817
953
: Nothing ();
@@ -827,7 +963,7 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
827
963
metadata.AddFullBackup (backup);
828
964
829
965
return new TS3Uploader (
830
- dataShard, txId, Task, std::move (scheme), std::move (permissions), metadata.Serialize ());
966
+ dataShard, txId, Task, std::move (scheme), std::move (changefeeds), std::move ( permissions), metadata.Serialize ());
831
967
}
832
968
833
969
} // NDataShard
0 commit comments