Skip to content

Restore changefeed configurations (tools restore) #12598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/dump/dump.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
FLUENT_SETTING_DEFAULT(bool, DryRun, false);
FLUENT_SETTING_DEFAULT(bool, RestoreData, true);
FLUENT_SETTING_DEFAULT(bool, RestoreIndexes, true);
FLUENT_SETTING_DEFAULT(bool, RestoreChangefeeds, true);
FLUENT_SETTING_DEFAULT(bool, RestoreACL, true);
FLUENT_SETTING_DEFAULT(bool, SkipDocumentTables, false);
FLUENT_SETTING_DEFAULT(bool, SavePartialResult, false);
Expand Down
97 changes: 92 additions & 5 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,39 @@ bool IsFileExists(const TFsPath& path) {
return path.Exists() && path.IsFile();
}

Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, const TLog* log) {
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read scheme from " << fsPath.Quote());
Ydb::Table::CreateTableRequest proto;
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
template <typename TProtoType>
TProtoType ReadProtoFromFile(const TFsPath& fsDirPath, const TLog* log, const NDump::NFiles::TFileInfo& fileInfo) {
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read " << fileInfo.LogObjectType << " from " << fsDirPath.GetPath().Quote());
TProtoType proto;
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsDirPath.Child(fileInfo.FileName)).ReadAll(), &proto));
return proto;

}

Ydb::Table::CreateTableRequest ReadTableScheme(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Table::CreateTableRequest>(fsDirPath, log, NDump::NFiles::TableScheme());
}

Ydb::Table::ChangefeedDescription ReadChangefeedDescription(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Table::ChangefeedDescription>(fsDirPath, log, NDump::NFiles::Changefeed());
}

Ydb::Topic::DescribeTopicResult ReadTopicDescription(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Topic::DescribeTopicResult>(fsDirPath, log, NDump::NFiles::Topic());
}

TTableDescription TableDescriptionFromProto(const Ydb::Table::CreateTableRequest& proto) {
return TProtoAccessor::FromProto(proto);
}

TChangefeedDescription ChangefeedDescriptionFromProto(const Ydb::Table::ChangefeedDescription& proto) {
return TProtoAccessor::FromProto(proto);
}

NTopic::TTopicDescription TopicDescriptionFromProto(Ydb::Topic::DescribeTopicResult&& proto) {
return NTopic::TTopicDescription(std::move(proto));
}

TTableDescription TableDescriptionWithoutIndexesFromProto(Ydb::Table::CreateTableRequest proto) {
proto.clear_indexes();
return TableDescriptionFromProto(proto);
Expand Down Expand Up @@ -163,6 +185,7 @@ TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog
, OperationClient(driver)
, SchemeClient(driver)
, TableClient(driver)
, TopicClient(driver)
, Log(log)
{
}
Expand Down Expand Up @@ -320,7 +343,7 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
}

auto scheme = ReadTableScheme(fsPath.Child(NFiles::TableScheme().FileName), Log.get());
auto scheme = ReadTableScheme(fsPath, Log.get());
auto dumpedDesc = TableDescriptionFromProto(scheme);

if (dumpedDesc.GetAttributes().contains(DOC_API_TABLE_VERSION_ATTR) && settings.SkipDocumentTables_) {
Expand Down Expand Up @@ -364,6 +387,22 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
LOG_D("Skip restoring indexes of " << dbPath.Quote());
}

if (settings.RestoreChangefeeds_) {
TVector<TFsPath> children;
fsPath.List(children);
for (const auto& fsChildPath : children) {
const bool isChangefeedDir = IsFileExists(fsChildPath.Child(NFiles::Changefeed().FileName));
if (isChangefeedDir) {
auto result = RestoreChangefeeds(fsChildPath, dbPath);
if (!result.IsSuccess()) {
return result;
}
}
}
} else {
LOG_D("Skip restoring changefeeds of " << dbPath.Quote());
}

return RestorePermissions(fsPath, dbPath, settings, oldEntries);
}

Expand Down Expand Up @@ -627,6 +666,54 @@ TRestoreResult TRestoreClient::RestoreIndexes(const TString& dbPath, const TTabl
return Result<TRestoreResult>();
}

TRestoreResult TRestoreClient::RestoreChangefeeds(const TFsPath& fsPath, const TString& dbPath) {
LOG_D("Process " << fsPath.GetPath().Quote());
if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
}

auto changefeedProto = ReadChangefeedDescription(fsPath, Log.get());
auto topicProto = ReadTopicDescription(fsPath, Log.get());

auto changefeedDesc = ChangefeedDescriptionFromProto(changefeedProto);
auto topicDesc = TopicDescriptionFromProto(std::move(topicProto));

changefeedDesc = changefeedDesc.WithRetentionPeriod(topicDesc.GetRetentionPeriod());

auto createResult = TableClient.RetryOperationSync([&changefeedDesc, &dbPath](TSession session) {
return session.AlterTable(dbPath, TAlterTableSettings().AppendAddChangefeeds(changefeedDesc)).GetValueSync();
});
if (createResult.IsSuccess()) {
LOG_D("Created " << fsPath.GetPath().Quote());
} else {
LOG_E("Failed to create " << fsPath.GetPath().Quote());
return Result<TRestoreResult>(fsPath.GetPath(), std::move(createResult));
}

return RestoreConsumers(Join("/", dbPath, fsPath.GetName()), topicDesc.GetConsumers());;
}

TRestoreResult TRestoreClient::RestoreConsumers(const TString& topicPath, const TVector<NTopic::TConsumer>& consumers) {
for (const auto& consumer : consumers) {
auto createResult = TopicClient.AlterTopic(topicPath,
NTopic::TAlterTopicSettings()
.BeginAddConsumer()
.ConsumerName(consumer.GetConsumerName())
.Important(consumer.GetImportant())
.Attributes(consumer.GetAttributes())
.EndAddConsumer()
).GetValueSync();
if (createResult.IsSuccess()) {
LOG_D("Created consumer " << consumer.GetConsumerName().Quote() << " for " << topicPath.Quote());
} else {
LOG_E("Failed to create " << consumer.GetConsumerName().Quote() << " for " << topicPath.Quote());
return Result<TRestoreResult>(topicPath, std::move(createResult));
}
}
return Result<TRestoreResult>();
}

TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const TString& dbPath,
const TRestoreSettings& settings, const THashSet<TString>& oldEntries)
{
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/lib/ydb_cli/dump/restore_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

#include <util/folder/path.h>
#include <util/generic/hash_set.h>
Expand Down Expand Up @@ -129,7 +130,9 @@ class TRestoreClient {
TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestoreChangefeeds(const TFsPath& path, const TString& dbPath);
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);
TRestoreResult RestoreConsumers(const TString& topicPath, const TVector<NTopic::TConsumer>& consumers);

THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,
const NTable::TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators);
Expand All @@ -147,6 +150,7 @@ class TRestoreClient {
NOperation::TOperationClient OperationClient;
NScheme::TSchemeClient SchemeClient;
NTable::TTableClient TableClient;
NTopic::TTopicClient TopicClient;
std::shared_ptr<TLog> Log;

}; // TRestoreClient
Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/dump/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ PEERDIR(
ydb/public/lib/ydb_cli/dump/files
ydb/public/lib/ydb_cli/dump/util
ydb/public/sdk/cpp/client/ydb_proto
ydb/public/sdk/cpp/client/ydb_topic
)

GENERATE_ENUM_SERIALIZATION(dump.h)
Expand Down
Loading