Skip to content

Export changefeed's configuration to s3 #12882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
02ee951
v1
stanislav-shchetinin Dec 20, 2024
fb6682b
completed
stanislav-shchetinin Dec 23, 2024
6705979
tests
stanislav-shchetinin Dec 23, 2024
cee969c
dirs
stanislav-shchetinin Dec 23, 2024
714178e
forFiltered
stanislav-shchetinin Dec 23, 2024
d469b2b
test
stanislav-shchetinin Dec 24, 2024
ac024b2
fix
stanislav-shchetinin Dec 24, 2024
9e459b7
commit
stanislav-shchetinin Dec 25, 2024
b3f86ca
FillTopicDescription
stanislav-shchetinin Dec 27, 2024
a745df8
removed extra changes
stanislav-shchetinin Dec 27, 2024
26ae427
v1
stanislav-shchetinin Dec 20, 2024
e38ce98
completed
stanislav-shchetinin Dec 23, 2024
35547c6
tests
stanislav-shchetinin Dec 23, 2024
0704d74
dirs
stanislav-shchetinin Dec 23, 2024
7ad37f5
forFiltered
stanislav-shchetinin Dec 23, 2024
9a82d4f
test
stanislav-shchetinin Dec 24, 2024
7c06269
fix
stanislav-shchetinin Dec 24, 2024
268c7be
commit
stanislav-shchetinin Dec 25, 2024
b983c14
fix bug
stanislav-shchetinin Dec 25, 2024
c89629c
fix rebase
stanislav-shchetinin Dec 26, 2024
fe7a81a
prepared upload
stanislav-shchetinin Dec 26, 2024
447f529
upload
stanislav-shchetinin Dec 27, 2024
aa10221
removed extra func
stanislav-shchetinin Dec 27, 2024
f6c7b2e
removed extra func
stanislav-shchetinin Dec 27, 2024
1fb32b5
fix after review
stanislav-shchetinin Dec 27, 2024
e466f85
fix
stanislav-shchetinin Dec 27, 2024
7cfe28e
not fill pq
stanislav-shchetinin Jan 9, 2025
7ee3a25
dbg
stanislav-shchetinin Jan 9, 2025
601a8be
dbg
stanislav-shchetinin Jan 9, 2025
c7669af
worked v1
stanislav-shchetinin Jan 9, 2025
6fd2dfa
removed dbg
stanislav-shchetinin Jan 10, 2025
321c388
get description
stanislav-shchetinin Jan 10, 2025
10cb25c
refactoring
stanislav-shchetinin Jan 10, 2025
5029c66
checksum in test
stanislav-shchetinin Jan 10, 2025
bc768d6
changefeed generator
stanislav-shchetinin Jan 10, 2025
437ebe4
rebase fix
stanislav-shchetinin Jan 10, 2025
0edf2e7
fix rebase
stanislav-shchetinin Jan 10, 2025
78df68b
use FillTopicDescription
stanislav-shchetinin Jan 16, 2025
5445590
removed extra file
stanislav-shchetinin Jan 16, 2025
c8a6d44
fix
stanislav-shchetinin Jan 16, 2025
be60a44
template
stanislav-shchetinin Jan 16, 2025
6c61dd1
ClearChangefeedUnderlyingTopics
stanislav-shchetinin Jan 16, 2025
e62bce1
Apply suggestions from code review
stanislav-shchetinin Jan 16, 2025
03fa0d0
renamed
stanislav-shchetinin Jan 16, 2025
36a7200
fix build
stanislav-shchetinin Jan 16, 2025
96ef021
files equal
stanislav-shchetinin Jan 16, 2025
a98b598
fix review
stanislav-shchetinin Jan 16, 2025
3b5807c
assert format
stanislav-shchetinin Jan 16, 2025
859e2eb
fix
stanislav-shchetinin Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ message TBackupTask {
}

optional TPathDescription Table = 10; // for further restore
repeated TPathDescription ChangefeedUnderlyingTopics = 17; // for further restore

message TScanSettings {
optional uint64 RowsBatchSize = 1 [default = 0]; // no limit
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/backup_restore_traits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ TString PermissionsKeySuffix() {
return "permissions.pb";
}

TString TopicKeySuffix() {
return "topic_description.pb";
}

TString ChangefeedKeySuffix() {
return "changefeed_description.pb";
}

TString SchemeKeySuffix() {
return "scheme.pb";
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/backup_restore_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur);
TString DataFileExtension(EDataFormat format, ECompressionCodec codec);

TString PermissionsKeySuffix();
TString TopicKeySuffix();
TString ChangefeedKeySuffix();
TString SchemeKeySuffix();
TString MetadataKeySuffix();
TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec);
Expand Down
150 changes: 143 additions & 7 deletions ydb/core/tx/datashard/export_s3_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <ydb/core/wrappers/s3_storage_config.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/wrappers/events/common.h>
#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/topic_description.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/http/http_proxy.h>
Expand All @@ -35,6 +37,11 @@ namespace NDataShard {
using namespace NBackup;
using namespace NBackupRestoreTraits;

struct TChangefeedExportDescriptions {
const Ydb::Table::ChangefeedDescription ChangefeedDescription;
const Ydb::Topic::DescribeTopicResult Topic;
};

class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig;
using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig;
Expand Down Expand Up @@ -165,6 +172,8 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
UploadPermissions();
} else if (!SchemeUploaded) {
UploadScheme();
} else if (!ChangefeedsUploaded) {
UploadChangefeed();
} else {
this->Become(&TThis::StateUploadData);

Expand Down Expand Up @@ -214,6 +223,46 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
this->Become(&TThis::StateUploadPermissions);
}

template <typename T>
void PutDescription(const google::protobuf::Message& desc, const TString& key, TString& checksum, T stateFunc) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А для загрузки схемы таблицы можно эту же функцию переиспользовать? (можно отдельным pr)

google::protobuf::TextFormat::PrintToString(desc, &Buffer);
if (EnableChecksums) {
checksum = ComputeChecksum(Buffer);
}
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(key);
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
this->Become(stateFunc);
}

void PutChangefeedDescription(const Ydb::Table::ChangefeedDescription& changefeed, const TString& changefeedName) {
PutDescription(changefeed, Settings.GetChangefeedKey(changefeedName), ChangefeedChecksum, &TThis::StateUploadChangefeed);
}

void PutTopicDescription(const Ydb::Topic::DescribeTopicResult& topic, const TString& changefeedName) {
PutDescription(topic, Settings.GetTopicKey(changefeedName), TopicChecksum, &TThis::StateUploadTopic);
}

const TString& GetCurrentChangefeedName() const {
return Changefeeds.at(IndexExportedChangefeed).ChangefeedDescription.Getname();
}

void UploadChangefeed() {
if (IndexExportedChangefeed == Changefeeds.size()) {
ChangefeedsUploaded = true;
if (Scanner) {
this->Send(Scanner, new TEvExportScan::TEvFeed());
}
this->Become(&TThis::StateUploadData);
return;
}
PutChangefeedDescription(Changefeeds[IndexExportedChangefeed].ChangefeedDescription, GetCurrentChangefeedName());
}

void UploadTopic() {
PutTopicDescription(Changefeeds[IndexExportedChangefeed].Topic, GetCurrentChangefeedName());
}

void UploadMetadata() {
Y_ABORT_UNLESS(!MetadataUploaded);

Expand Down Expand Up @@ -256,11 +305,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {

auto nextStep = [this]() {
SchemeUploaded = true;

if (Scanner) {
this->Send(Scanner, new TEvExportScan::TEvFeed());
}
this->Become(&TThis::StateUploadData);
UploadChangefeed();
};

if (EnableChecksums) {
Expand Down Expand Up @@ -295,6 +340,51 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
}
}

void HandleChangefeed(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

EXPORT_LOG_D("HandleChangefeed TEvExternalStorage::TEvPutObjectResponse"
<< ": self# " << this->SelfId()
<< ", result# " << result);

if (!CheckResult(result, TStringBuf("PutObject (changefeed)"))) {
return;
}

auto nextStep = [this]() {
UploadTopic();
};
if (EnableChecksums) {
TString checksumKey = ChecksumKey(Settings.GetChangefeedKey(GetCurrentChangefeedName()));
UploadChecksum(std::move(ChangefeedChecksum), checksumKey, ChangefeedKeySuffix(), nextStep);
} else {
nextStep();
}
}

void HandleTopic(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

EXPORT_LOG_D("HandleTopic TEvExternalStorage::TEvPutObjectResponse"
<< ": self# " << this->SelfId()
<< ", result# " << result);

if (!CheckResult(result, TStringBuf("PutObject (topic)"))) {
return;
}

auto nextStep = [this]() {
++IndexExportedChangefeed;
UploadChangefeed();
};
if (EnableChecksums) {
TString checksumKey = ChecksumKey(Settings.GetTopicKey(GetCurrentChangefeedName()));
UploadChecksum(std::move(TopicChecksum), checksumKey, TopicKeySuffix(), nextStep);
} else {
nextStep();
}
}

void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

Expand Down Expand Up @@ -344,7 +434,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
return PassAway();
}

if (ProxyResolved && SchemeUploaded && MetadataUploaded && PermissionsUploaded) {
if (ProxyResolved && SchemeUploaded && MetadataUploaded && PermissionsUploaded && ChangefeedsUploaded) {
this->Send(Scanner, new TEvExportScan::TEvFeed());
}
}
Expand Down Expand Up @@ -661,6 +751,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
const TActorId& dataShard, ui64 txId,
const NKikimrSchemeOp::TBackupTask& task,
TMaybe<Ydb::Table::CreateTableRequest>&& scheme,
TVector<TChangefeedExportDescriptions> changefeeds,
TMaybe<Ydb::Scheme::ModifyPermissionsRequest>&& permissions,
TString&& metadata)
: ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings()))
Expand All @@ -672,12 +763,14 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
, DataShard(dataShard)
, TxId(txId)
, Scheme(std::move(scheme))
, Changefeeds(std::move(changefeeds))
, Metadata(std::move(metadata))
, Permissions(std::move(permissions))
, Retries(task.GetNumberOfRetries())
, Attempt(0)
, Delay(TDuration::Minutes(1))
, SchemeUploaded(ShardNum == 0 ? false : true)
, ChangefeedsUploaded(ShardNum == 0 ? false : true)
, MetadataUploaded(ShardNum == 0 ? false : true)
, PermissionsUploaded(ShardNum == 0 ? false : true)
, EnableChecksums(task.GetEnableChecksums())
Expand Down Expand Up @@ -730,6 +823,22 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
}
}

STATEFN(StateUploadChangefeed) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleChangefeed);
default:
return StateBase(ev);
}
}

STATEFN(StateUploadTopic) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleTopic);
default:
return StateBase(ev);
}
}

STATEFN(StateUploadMetadata) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata);
Expand Down Expand Up @@ -775,17 +884,20 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
const TActorId DataShard;
const ui64 TxId;
const TMaybe<Ydb::Table::CreateTableRequest> Scheme;
const TVector<TChangefeedExportDescriptions> Changefeeds;
const TString Metadata;
const TMaybe<Ydb::Scheme::ModifyPermissionsRequest> Permissions;

const ui32 Retries;
ui32 Attempt;
ui64 IndexExportedChangefeed = 0;

TDuration Delay;
static constexpr TDuration MaxDelay = TDuration::Minutes(10);

TActorId Client;
bool SchemeUploaded;
bool ChangefeedsUploaded;
bool MetadataUploaded;
bool PermissionsUploaded;
bool MultiPart;
Expand All @@ -801,6 +913,8 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
bool EnableChecksums;
TString DataChecksum;
TString MetadataChecksum;
TString ChangefeedChecksum;
TString TopicChecksum;
TString SchemeChecksum;
TString PermissionsChecksum;
std::function<void()> ChecksumUploadedCallback;
Expand All @@ -812,6 +926,28 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
? GenYdbScheme(Columns, Task.GetTable())
: Nothing();

const auto& persQueues = Task.GetChangefeedUnderlyingTopics();
const auto& cdcStreams = Task.GetTable().GetTable().GetCdcStreams();
Y_ASSERT(persQueues.size() == cdcStreams.size());

const int changefeedsCount = cdcStreams.size();
TVector <TChangefeedExportDescriptions> changefeeds;
changefeeds.reserve(changefeedsCount);

for (int i = 0; i < changefeedsCount; ++i) {
Ydb::Table::ChangefeedDescription changefeed;
const auto& cdcStream = cdcStreams.at(i);
FillChangefeedDescription(changefeed, cdcStream);

Ydb::Topic::DescribeTopicResult topic;
const auto& pq = persQueues.at(i);
Ydb::StatusIds::StatusCode status;
TString error;
FillTopicDescription(topic, pq.GetPersQueueGroup(), pq.GetSelf(), cdcStream.GetName(), status, error);

changefeeds.emplace_back(changefeed, topic);
}

auto permissions = (Task.GetShardNum() == 0)
? GenYdbPermissions(Task.GetTable())
: Nothing();
Expand All @@ -827,7 +963,7 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
metadata.AddFullBackup(backup);

return new TS3Uploader(
dataShard, txId, Task, std::move(scheme), std::move(permissions), metadata.Serialize());
dataShard, txId, Task, std::move(scheme), std::move(changefeeds), std::move(permissions), metadata.Serialize());
}

} // NDataShard
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/extstorage_usage_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class TS3Settings {
return ObjectKeyPattern + '/' + NBackupRestoreTraits::PermissionsKeySuffix();
}

inline TString GetTopicKey(const TString& changefeedName) const {
return TStringBuilder() << ObjectKeyPattern << '/'<< changefeedName << '/' << NBackupRestoreTraits::TopicKeySuffix();
}

inline TString GetChangefeedKey(const TString& changefeedName) const {
return TStringBuilder() << ObjectKeyPattern << '/' << changefeedName << '/' << NBackupRestoreTraits::ChangefeedKeySuffix();
}

inline TString GetMetadataKey() const {
return ObjectKeyPattern + '/' + NBackupRestoreTraits::MetadataKeySuffix();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct TBackup {
context.OnComplete.BindMsgToPipe(opId, datashardId, idx, event.Release());

backup.ClearTable();
backup.ClearChangefeedUnderlyingTopics();
}
}

Expand Down
41 changes: 39 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,48 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(
return propose;
}

static NKikimrSchemeOp::TPathDescription GetTableDescription(TSchemeShard* ss, const TPathId& pathId) {
NKikimrSchemeOp::TDescribeOptions opts;
static void SetTableDescriptionOptions(NKikimrSchemeOp::TDescribeOptions& opts) {
opts.SetReturnPartitioningInfo(false);
opts.SetReturnPartitionConfig(true);
opts.SetReturnBoundaries(true);
opts.SetReturnIndexTableBoundaries(true);
}

static void SetChangefeedDescriptionOptions(NKikimrSchemeOp::TDescribeOptions& opts) {
SetTableDescriptionOptions(opts);
opts.SetShowPrivateTable(true);
}

static void SetTopicDescriptionOptions(NKikimrSchemeOp::TDescribeOptions& opts) {
SetTableDescriptionOptions(opts);
opts.SetShowPrivateTable(true);
}

static NKikimrSchemeOp::TPathDescription GetDescription(TSchemeShard* ss, const TPathId& pathId, NKikimrSchemeOp::TDescribeOptions& opts) {
auto desc = DescribePath(ss, TlsActivationContext->AsActorContext(), pathId, opts);
auto record = desc->GetRecord();

return record.GetPathDescription();
}

static NKikimrSchemeOp::TPathDescription GetTableDescription(TSchemeShard* ss, const TPathId& pathId) {
NKikimrSchemeOp::TDescribeOptions opts;
SetTableDescriptionOptions(opts);
return GetDescription(ss, pathId, opts);
}

static NKikimrSchemeOp::TPathDescription GetChangefeedDescription(TSchemeShard* ss, const TPathId& pathId) {
NKikimrSchemeOp::TDescribeOptions opts;
SetChangefeedDescriptionOptions(opts);
return GetDescription(ss, pathId, opts);
}

static NKikimrSchemeOp::TPathDescription GetTopicDescription(TSchemeShard* ss, const TPathId& pathId) {
NKikimrSchemeOp::TDescribeOptions opts;
SetTopicDescriptionOptions(opts);
return GetDescription(ss, pathId, opts);
}

void FillSetValForSequences(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription& description,
const TPathId& exportItemPathId) {
NKikimrSchemeOp::TDescribeOptions opts;
Expand Down Expand Up @@ -150,6 +179,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
FillSetValForSequences(
ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
FillPartitioning(ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
for (const auto& cdcStream : sourceDescription.GetTable().GetCdcStreams()) {
auto cdcPathDesc = GetChangefeedDescription(ss, TPathId::FromProto(cdcStream.GetPathId()));
for (const auto& child : cdcPathDesc.GetChildren()) {
if (child.GetPathType() == NKikimrSchemeOp::EPathTypePersQueueGroup) {
*task.AddChangefeedUnderlyingTopics() = GetTopicDescription(ss, TPathId(child.GetSchemeshardId(), child.GetPathId()));
}
}
}
}
task.MutableTable()->CopyFrom(sourceDescription);
}
Expand Down
Loading
Loading