Skip to content

Backup changefeed configurations (tools dump) #12283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
#include <ydb/public/lib/ydb_cli/dump/files/files.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>
Expand Down Expand Up @@ -475,6 +478,36 @@ void BackupPermissions(TDriver driver, const TString& dbPrefix, const TString& p
WriteProtoToFile(proto, folderPath, NDump::NFiles::Permissions());
}

Ydb::Table::ChangefeedDescription ProtoFromChangefeedDesc(const NTable::TChangefeedDescription& changefeedDesc) {
Ydb::Table::ChangefeedDescription protoChangeFeedDesc;
changefeedDesc.SerializeTo(protoChangeFeedDesc);
return protoChangeFeedDesc;
}

NTopic::TDescribeTopicResult DescribeTopic(TDriver driver, const TString& path) {
NYdb::NTopic::TTopicClient client(driver);
return NConsoleClient::RetryFunction([&]() {
return client.DescribeTopic(path).GetValueSync();
});
}

void BackupChangefeeds(TDriver driver, const TString& tablePath, const TFsPath& folderPath) {
auto desc = DescribeTable(driver, tablePath);

for (const auto& changefeedDesc : desc.GetChangefeedDescriptions()) {
TFsPath changefeedDirPath = CreateDirectory(folderPath, changefeedDesc.GetName());

auto protoChangeFeedDesc = ProtoFromChangefeedDesc(changefeedDesc);
const auto descTopicResult = DescribeTopic(driver, JoinDatabasePath(tablePath, changefeedDesc.GetName()));
VerifyStatus(descTopicResult);
const auto& topicDescription = descTopicResult.GetTopicDescription();
const auto protoTopicDescription = NYdb::TProtoAccessor::GetProto(topicDescription);

WriteProtoToFile(protoChangeFeedDesc, changefeedDirPath, NDump::NFiles::Changefeed());
WriteProtoToFile(protoTopicDescription, changefeedDirPath, NDump::NFiles::Topic());
}
}

void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupPrefix, const TString& path,
const TFsPath& folderPath, bool schemaOnly, bool preservePoolKinds, bool ordered) {
Y_ENSURE(!path.empty());
Expand All @@ -486,8 +519,9 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP

auto desc = DescribeTable(driver, fullPath);
auto proto = ProtoFromTableDescription(desc, preservePoolKinds);

WriteProtoToFile(proto, folderPath, NDump::NFiles::TableScheme());

BackupChangefeeds(driver, JoinDatabasePath(dbPrefix, path), folderPath);
BackupPermissions(driver, dbPrefix, path, folderPath);

if (!schemaOnly) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/backup/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ PEERDIR(
ydb/public/lib/yson_value
ydb/public/lib/ydb_cli/dump/files
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_proto
ydb/public/sdk/cpp/client/ydb_result
ydb/public/sdk/cpp/client/ydb_table
ydb/public/sdk/cpp/client/ydb_topic
ydb/public/sdk/cpp/client/ydb_value
)

Expand Down
31 changes: 27 additions & 4 deletions ydb/public/sdk/cpp/client/ydb_table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2816,10 +2816,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
return ret;
}

void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
template <typename TProto>
void TChangefeedDescription::SerializeCommonFields(TProto& proto) const {
proto.set_name(Name_);
proto.set_virtual_timestamps(VirtualTimestamps_);
proto.set_initial_scan(InitialScan_);
proto.set_aws_region(AwsRegion_);

switch (Mode_) {
Expand Down Expand Up @@ -2860,12 +2860,35 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval());
}

for (const auto& [key, value] : Attributes_) {
(*proto.mutable_attributes())[key] = value;
}
}

void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
SerializeCommonFields(proto);
proto.set_initial_scan(InitialScan_);

if (RetentionPeriod_) {
SetDuration(*RetentionPeriod_, *proto.mutable_retention_period());
}
}

for (const auto& [key, value] : Attributes_) {
(*proto.mutable_attributes())[key] = value;
void TChangefeedDescription::SerializeTo(Ydb::Table::ChangefeedDescription& proto) const {
SerializeCommonFields(proto);

switch (State_) {
case EChangefeedState::Enabled:
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_ENABLED);
break;
case EChangefeedState::Disabled:
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_DISABLED);
break;
case EChangefeedState::InitialScan:
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_INITIAL_SCAN);
break;
case EChangefeedState::Unknown:
break;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ class TChangefeedDescription {
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;

void SerializeTo(Ydb::Table::Changefeed& proto) const;
void SerializeTo(Ydb::Table::ChangefeedDescription& proto) const;
TString ToString() const;
void Out(IOutputStream& o) const;

Expand All @@ -399,6 +400,9 @@ class TChangefeedDescription {
template <typename TProto>
static TChangefeedDescription FromProto(const TProto& proto);

template <typename TProto>
void SerializeCommonFields(TProto& proto) const;

private:
TString Name_;
EChangefeedMode Mode_;
Expand Down
Loading