Skip to content

External data sources: dump #14577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/rpc_describe_external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ Ydb::Table::DescribeExternalDataSourceResult Convert(const TDirEntry& inSelf, co
out.set_source_type(inDesc.GetSourceType());
out.set_location(inDesc.GetLocation());
auto& properties = *out.mutable_properties();
properties = inDesc.GetProperties().GetProperties();
for (const auto& [key, value] : inDesc.GetProperties().GetProperties()) {
properties[to_upper(key)] = value;
}
Convert(inDesc.GetAuth(), properties);
return out;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_describe_external_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool ConvertContent(
for (const auto& item : items) {
json.AppendValue(item);
}
out[key] = WriteJson(json, false);
out[to_upper(key)] = WriteJson(json, false);
}
} catch (...) {
issues.AddIssue(TStringBuilder() << "Cannot unpack the content of an external table of type: " << sourceType
Expand Down
145 changes: 139 additions & 6 deletions ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <yql/essentials/sql/v1/format/sql_format.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/regex/pcre/regexp.h>
#include <library/cpp/string_utils/quote/quote.h>

Expand All @@ -55,6 +56,7 @@
#include <google/protobuf/text_format.h>

#include <format>
#include <ranges>

namespace NYdb::NBackup {

Expand Down Expand Up @@ -788,6 +790,131 @@ void BackupReplication(
BackupPermissions(driver, dbPath, fsBackupFolder);
}

namespace {

Ydb::Table::DescribeExternalDataSourceResult DescribeExternalDataSource(TDriver driver, const TString& path) {
NTable::TTableClient client(driver);
Ydb::Table::DescribeExternalDataSourceResult description;
auto status = client.RetryOperationSync([&](NTable::TSession session) {
auto result = session.DescribeExternalDataSource(path).ExtractValueSync();
if (result.IsSuccess()) {
description = TProtoAccessor::GetProto(result.GetExternalDataSourceDescription());
}
return result;
});
VerifyStatus(status, "describe external data source");
return description;
}

std::string ToString(std::string_view key, std::string_view value) {
// indented to follow the default YQL formatting
return std::format(R"( {} = "{}")", key, value);
}

namespace NExternalDataSource {

std::string PropertyToString(const std::pair<TProtoStringType, TProtoStringType>& property) {
const auto& [key, value] = property;
return ToString(key, value);
}

}

TString BuildCreateExternalDataSourceQuery(const Ydb::Table::DescribeExternalDataSourceResult& description) {
return std::format(
"CREATE EXTERNAL DATA SOURCE IF NOT EXISTS `{}` WITH (\n{},\n{}{}\n);",
description.self().name().c_str(),
ToString("SOURCE_TYPE", description.source_type()),
ToString("LOCATION", description.location()),
description.properties().empty()
? ""
: std::string(",\n") +
JoinSeq(",\n", std::views::transform(description.properties(), NExternalDataSource::PropertyToString)).c_str()
);
}

}

void BackupExternalDataSource(TDriver driver, const TString& dbPath, const TFsPath& fsBackupFolder) {
Y_ENSURE(!dbPath.empty());
LOG_I("Backup external data source " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote());

const auto description = DescribeExternalDataSource(driver, dbPath);
const auto creationQuery = BuildCreateExternalDataSourceQuery(description);

WriteCreationQueryToFile(creationQuery, fsBackupFolder, NDump::NFiles::CreateExternalDataSource());
BackupPermissions(driver, dbPath, fsBackupFolder);
}

namespace {

Ydb::Table::DescribeExternalTableResult DescribeExternalTable(TDriver driver, const TString& path) {
NTable::TTableClient client(driver);
Ydb::Table::DescribeExternalTableResult description;
auto status = client.RetryOperationSync([&](NTable::TSession session) {
auto result = session.DescribeExternalTable(path).ExtractValueSync();
if (result.IsSuccess()) {
description = TProtoAccessor::GetProto(result.GetExternalTableDescription());
}
return result;
});
VerifyStatus(status, "describe external table");
return description;
}

namespace NExternalTable {

std::string PropertyToString(const std::pair<TProtoStringType, TProtoStringType>& property) {
const auto& [key, json] = property;
const auto items = NJson::ReadJsonFastTree(json).GetArray();
Y_ENSURE(!items.empty(), "Empty items for an external table property: " << key);
if (items.size() == 1) {
return ToString(key, items.front().GetString());
} else {
return ToString(key, std::format("[{}]", JoinSeq(", ", items).c_str()));
}
}

}

std::string ColumnToString(const Ydb::Table::ColumnMeta& column) {
const auto& type = column.type();
const bool notNull = !type.has_optional_type() || (type.has_pg_type() && column.not_null());
return std::format(
" {} {}{}",
column.name().c_str(),
TType(type).ToString(),
notNull ? " NOT NULL" : ""
);
}

TString BuildCreateExternalTableQuery(const Ydb::Table::DescribeExternalTableResult& description) {
return std::format(
"CREATE EXTERNAL TABLE IF NOT EXISTS `{}` (\n{}\n) WITH (\n{},\n{}{}\n);",
description.self().name().c_str(),
JoinSeq(",\n", std::views::transform(description.columns(), ColumnToString)).c_str(),
ToString("DATA_SOURCE", description.data_source_path()),
ToString("LOCATION", description.location()),
description.content().empty()
? ""
: std::string(",\n") +
JoinSeq(",\n", std::views::transform(description.content(), NExternalTable::PropertyToString)).c_str()
);
}

}

void BackupExternalTable(TDriver driver, const TString& dbPath, const TFsPath& fsBackupFolder) {
Y_ENSURE(!dbPath.empty());
LOG_I("Backup external table " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote());

const auto description = DescribeExternalTable(driver, dbPath);
const auto creationQuery = BuildCreateExternalTableQuery(description);

WriteCreationQueryToFile(creationQuery, fsBackupFolder, NDump::NFiles::CreateExternalTable());
BackupPermissions(driver, dbPath, fsBackupFolder);
}

void CreateClusterDirectory(const TDriver& driver, const TString& path, bool rootBackupDir = false) {
if (rootBackupDir) {
LOG_I("Create temporary directory " << path.Quote() << " in database");
Expand Down Expand Up @@ -837,11 +964,11 @@ void BackupFolderImpl(TDriver driver, const TString& database, const TString& db
bool schemaOnly, bool useConsistentCopyTable, bool avoidCopy, bool preservePoolKinds, bool ordered,
NYql::TIssues& issues
) {
TFile(folderPath.Child(NDump::NFiles::Incomplete().FileName), CreateAlways);
TFile(folderPath.Child(NDump::NFiles::Incomplete().FileName), CreateAlways).Close();

TMap<TString, TAsyncStatus> copiedTablesStatuses;
TVector<NTable::TCopyItem> tablesToCopy;
// Copy all tables to temporal folder
// Copy all tables to temporal folder and backup other scheme objects along the way.
{
TDbIterator<ETraverseType::Preordering> dbIt(driver, dbPrefix);
while (dbIt) {
Expand Down Expand Up @@ -884,6 +1011,12 @@ void BackupFolderImpl(TDriver driver, const TString& database, const TString& db
if (dbIt.IsReplication()) {
BackupReplication(driver, database, dbIt.GetTraverseRoot(), dbIt.GetRelPath(), childFolderPath);
}
if (dbIt.IsExternalDataSource()) {
BackupExternalDataSource(driver, dbIt.GetFullPath(), childFolderPath);
}
if (dbIt.IsExternalTable()) {
BackupExternalTable(driver, dbIt.GetFullPath(), childFolderPath);
}
dbIt.Next();
}
}
Expand Down Expand Up @@ -1040,7 +1173,7 @@ TAdmins FindAdmins(TDriver driver, const TString& dbPath) {
struct TBackupDatabaseSettings {
bool WithRegularUsers = false;
bool WithContent = false;
TString TemporalBackupPostfix;
TString TemporalBackupPostfix = "";
};

void BackupUsers(TDriver driver, const TString& dbPath, const TFsPath& folderPath, const THashSet<TString>& filter = {}) {
Expand Down Expand Up @@ -1165,7 +1298,7 @@ void BackupDatabaseImpl(TDriver driver, const TString& dbPath, const TFsPath& fo
Ydb::Cms::CreateDatabaseRequest proto;
status.SerializeTo(proto);
WriteProtoToFile(proto, folderPath, NDump::NFiles::Database());

if (!settings.WithRegularUsers) {
TAdmins admins = FindAdmins(driver, dbPath);
BackupUsers(driver, dbPath, folderPath, admins.UserSids);
Expand Down Expand Up @@ -1308,7 +1441,7 @@ void BackupDatabase(const TDriver& driver, const TString& database, TFsPath fold

try {
NYql::TIssues issues;
TFile(folderPath.Child(NDump::NFiles::Incomplete().FileName), CreateAlways);
TFile(folderPath.Child(NDump::NFiles::Incomplete().FileName), CreateAlways).Close();

BackupDatabaseImpl(driver, database, folderPath, {
.WithRegularUsers = true,
Expand Down Expand Up @@ -1339,7 +1472,7 @@ void BackupCluster(const TDriver& driver, TFsPath folderPath) {

try {
NYql::TIssues issues;
TFile(folderPath.Child(NDump::NFiles::Incomplete().FileName), CreateAlways);
TFile(folderPath.Child(NDump::NFiles::Incomplete().FileName), CreateAlways).Close();

BackupClusterRoot(driver, folderPath);
auto databases = ListDatabases(driver);
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/backup/db_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ class TDbIterator {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Replication;
}

bool IsExternalDataSource() const {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::ExternalDataSource;
}

bool IsExternalTable() const {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::ExternalTable;
}

bool IsListed() const {
return NextNodes.front().IsListed;
}
Expand Down
12 changes: 12 additions & 0 deletions ydb/public/lib/ydb_cli/dump/files/files.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ enum EFilesType {
CREATE_GROUP,
ALTER_GROUP,
CREATE_ASYNC_REPLICATION,
CREATE_EXTERNAL_DATA_SOURCE,
CREATE_EXTERNAL_TABLE,
};

static constexpr TFileInfo FILES_INFO[] = {
Expand All @@ -38,6 +40,8 @@ static constexpr TFileInfo FILES_INFO[] = {
{"create_group.sql", "groups"},
{"alter_group.sql", "group members"},
{"create_async_replication.sql", "async replication"},
{"create_external_data_source.sql", "external data source"},
{"create_external_table.sql", "external table"},
};

const TFileInfo& TableScheme() {
Expand Down Expand Up @@ -104,4 +108,12 @@ const TFileInfo& CreateAsyncReplication() {
return FILES_INFO[CREATE_ASYNC_REPLICATION];
}

const TFileInfo& CreateExternalDataSource() {
return FILES_INFO[CREATE_EXTERNAL_DATA_SOURCE];
}

const TFileInfo& CreateExternalTable() {
return FILES_INFO[CREATE_EXTERNAL_TABLE];
}

} // NYdb::NDump::NFiles
2 changes: 2 additions & 0 deletions ydb/public/lib/ydb_cli/dump/files/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ const TFileInfo& CreateUser();
const TFileInfo& CreateGroup();
const TFileInfo& AlterGroup();
const TFileInfo& CreateAsyncReplication();
const TFileInfo& CreateExternalDataSource();
const TFileInfo& CreateExternalTable();

} // NYdb::NDump:NFiles
Loading