|
17 | 17 | #include <ydb/public/lib/ydb_cli/dump/util/view_utils.h> |
18 | 18 | #include <yql/essentials/public/issue/yql_issue.h> |
19 | 19 |
|
| 20 | +#include <library/cpp/json/json_reader.h> |
20 | 21 | #include <library/cpp/json/json_writer.h> |
21 | 22 | #include <library/cpp/threading/future/core/future.h> |
22 | 23 |
|
@@ -425,6 +426,7 @@ TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog |
425 | 426 | , RateLimiterClient(driver) |
426 | 427 | , QueryClient(driver) |
427 | 428 | , CmsClient(driver) |
| 429 | + , ReplicationClient(driver) |
428 | 430 | , Log(log) |
429 | 431 | , DriverConfig(driver.GetConfig()) |
430 | 432 | { |
@@ -467,7 +469,11 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP |
467 | 469 |
|
468 | 470 | // restore |
469 | 471 | auto restoreResult = Result<TRestoreResult>(); |
470 | | - restoreResult = RestoreFolder(fsPath, dbPath, settings, oldEntries); |
| 472 | + if (settings.Replace_) { |
| 473 | + restoreResult = DropAndRestore(fsPath, dbPath, settings, oldEntries); |
| 474 | + } else { |
| 475 | + restoreResult = RestoreFolder(fsPath, dbPath, settings, oldEntries); |
| 476 | + } |
471 | 477 | if (auto result = DelayedRestoreManager.RestoreDelayed(); !result.IsSuccess()) { |
472 | 478 | restoreResult = result; |
473 | 479 | } |
@@ -983,6 +989,48 @@ namespace { |
983 | 989 | return out; |
984 | 990 | } |
985 | 991 |
|
| 992 | + bool TypesAreMatching(ESchemeEntryType lhs, ESchemeEntryType rhs) { |
| 993 | + return lhs == rhs |
| 994 | + || lhs == ESchemeEntryType::SubDomain && rhs == ESchemeEntryType::Directory |
| 995 | + || rhs == ESchemeEntryType::SubDomain && lhs == ESchemeEntryType::Directory; |
| 996 | + } |
| 997 | + |
| 998 | + TStatus GetExternalTablesReferencingSource(TTableClient& client, const TString& path, TVector<TString>& references) { |
| 999 | + references.clear(); |
| 1000 | + |
| 1001 | + Ydb::Table::DescribeExternalDataSourceResult description; |
| 1002 | + auto status = DescribeExternalDataSource(client, path, description); |
| 1003 | + if (!status.IsSuccess()) { |
| 1004 | + return status; |
| 1005 | + } |
| 1006 | + auto iteratorToReferences = description.properties().find("REFERENCES"); |
| 1007 | + if (iteratorToReferences == description.properties().end()) { |
| 1008 | + return status; |
| 1009 | + } |
| 1010 | + auto items = NJson::ReadJsonFastTree(iteratorToReferences->second).GetArray(); |
| 1011 | + references.reserve(items.size()); |
| 1012 | + for (const auto& item : items) { |
| 1013 | + references.emplace_back(item.GetString()); |
| 1014 | + } |
| 1015 | + return status; |
| 1016 | + } |
| 1017 | + |
| 1018 | + TStatus GetReplicationSourceTables(NReplication::TReplicationClient& client, const TString& path, TVector<TString>& sources) { |
| 1019 | + sources.clear(); |
| 1020 | + |
| 1021 | + TMaybe<NReplication::TReplicationDescription> description; |
| 1022 | + auto status = DescribeReplication(client, path, description); |
| 1023 | + if (!status.IsSuccess()) { |
| 1024 | + return status; |
| 1025 | + } |
| 1026 | + const auto& items = description->GetItems(); |
| 1027 | + sources.reserve(items.size()); |
| 1028 | + for (const auto& item : items) { |
| 1029 | + sources.emplace_back(item.SrcPath); |
| 1030 | + } |
| 1031 | + return status; |
| 1032 | + } |
| 1033 | + |
986 | 1034 | } |
987 | 1035 |
|
988 | 1036 | TRestoreResult TRestoreClient::RestoreFolder( |
@@ -1010,6 +1058,26 @@ TRestoreResult TRestoreClient::RestoreFolder( |
1010 | 1058 | return Result<TRestoreResult>(); |
1011 | 1059 | } |
1012 | 1060 |
|
| 1061 | +TRestoreResult TRestoreClient::Drop(ESchemeEntryType type, const TString& path, const TRestoreSettings& settings) { |
| 1062 | + LOG_D("Preparing to drop " << path.Quote()); |
| 1063 | + if (settings.DryRun_) { |
| 1064 | + return Result<TRestoreResult>(); |
| 1065 | + } |
| 1066 | + |
| 1067 | + auto remover = NInternal::CreateDefaultRemover(SchemeClient, TableClient, TopicClient, QueryClient, CoordinationNodeClient, {}); |
| 1068 | + TSchemeEntry entry; |
| 1069 | + entry.Type = type; |
| 1070 | + entry.Name = path; |
| 1071 | + TStatus result = remover(entry); |
| 1072 | + |
| 1073 | + if (result.IsSuccess()) { |
| 1074 | + LOG_D("Dropped " << path.Quote()); |
| 1075 | + return Result<TRestoreResult>(); |
| 1076 | + } |
| 1077 | + LOG_E("Failed to drop " << path.Quote()); |
| 1078 | + return Result<TRestoreResult>(path, std::move(result)); |
| 1079 | +} |
| 1080 | + |
1013 | 1081 | TRestoreResult TRestoreClient::Restore(NScheme::ESchemeEntryType type, const TFsPath& fsPath, const TString& dbRestoreRoot, const TString& dbPathRelativeToRestoreRoot, const TRestoreSettings& settings, bool isAlreadyExisting, bool delay) { |
1014 | 1082 | const auto dbPath = dbRestoreRoot + dbPathRelativeToRestoreRoot; |
1015 | 1083 | switch (type) { |
@@ -1047,6 +1115,201 @@ TRestoreResult TRestoreClient::Restore(NScheme::ESchemeEntryType type, const TFs |
1047 | 1115 |
|
1048 | 1116 | } |
1049 | 1117 |
|
| 1118 | +TRestoreResult TRestoreClient::DropAndRestoreExternals(const TVector<TFsBackupEntry>& backupEntries, const TVector<size_t>& externalDataSources, const THashMap<TString, size_t>& externalTables, const TRestoreSettings& settings) { |
| 1119 | + for (size_t i : externalDataSources) { |
| 1120 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1121 | + TVector<TString> references; |
| 1122 | + if (auto status = GetExternalTablesReferencingSource(TableClient, dbPath, references); !status.IsSuccess()) { |
| 1123 | + return Result<TRestoreResult>(fsPath, std::move(status)); |
| 1124 | + } |
| 1125 | + if (!AllOf(references, [&externalTables](const TString& dbPath) { |
| 1126 | + return externalTables.contains(dbPath); |
| 1127 | + })) { |
| 1128 | + return Result<TRestoreResult>(fsPath, EStatus::BAD_REQUEST, |
| 1129 | + "External data source cannot be replaced, because it is referenced by an external table that is not in the backup." |
| 1130 | + ); |
| 1131 | + } |
| 1132 | + } |
| 1133 | + |
| 1134 | + for (const auto& [dbPath, i] : externalTables) { |
| 1135 | + auto result = Drop(ESchemeEntryType::ExternalTable, dbPath, settings); |
| 1136 | + if (!result.IsSuccess()) { |
| 1137 | + return result; |
| 1138 | + } |
| 1139 | + } |
| 1140 | + for (size_t i : externalDataSources) { |
| 1141 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1142 | + if (auto result = Drop(type, dbPath, settings); !result.IsSuccess()) { |
| 1143 | + return result; |
| 1144 | + } |
| 1145 | + if (auto result = RestoreExternalDataSource(fsPath, dbPath, settings, false); !result.IsSuccess()) { |
| 1146 | + return result; |
| 1147 | + } |
| 1148 | + } |
| 1149 | + for (const auto& [dbPath, i] : externalTables) { |
| 1150 | + const auto& fsPath = backupEntries[i].FsPath; |
| 1151 | + auto result = RestoreExternalTable(fsPath, dbPath, settings, false /* already exists */); |
| 1152 | + if (!result.IsSuccess()) { |
| 1153 | + return result; |
| 1154 | + } |
| 1155 | + } |
| 1156 | + |
| 1157 | + return Result<TRestoreResult>(); |
| 1158 | +} |
| 1159 | + |
| 1160 | +TRestoreResult TRestoreClient::DropAndRestoreTablesAndDependents(const TVector<TFsBackupEntry>& backupEntries, const THashMap<TString, size_t>& tables, const TVector<size_t>& views, const THashMap<TString, size_t>& replications, const TString& dbRestoreRoot, const TRestoreSettings& settings, const THashMap<TString, ESchemeEntryType>& existingEntries) { |
| 1161 | + // to do: verify that no replication in the entire database (not just the restore root!) depends on the tables we are going to drop |
| 1162 | + for (const auto& [dbPath, type] : existingEntries) { |
| 1163 | + if (type == ESchemeEntryType::Replication && !replications.contains(dbPath)) { |
| 1164 | + // a replication that is not present in the backup, but present in the database |
| 1165 | + TVector<TString> sources; |
| 1166 | + if (auto status = GetReplicationSourceTables(ReplicationClient, dbPath, sources); !status.IsSuccess()) { |
| 1167 | + return status; |
| 1168 | + } |
| 1169 | + for (const auto& source : sources) { |
| 1170 | + if (tables.contains(source)) { |
| 1171 | + return Result<TRestoreResult>(dbPath, EStatus::BAD_REQUEST, |
| 1172 | + TStringBuilder() << "Cannot replace the table: " << source << ", because the replication: " << dbPath << " depends on it." |
| 1173 | + ); |
| 1174 | + } |
| 1175 | + } |
| 1176 | + } |
| 1177 | + } |
| 1178 | + |
| 1179 | + for (size_t i : views) { |
| 1180 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1181 | + if (auto result = Drop(type, dbPath, settings); !result.IsSuccess()) { |
| 1182 | + return result; |
| 1183 | + } |
| 1184 | + } |
| 1185 | + |
| 1186 | + for (const auto& [dbPath, i] : replications) { |
| 1187 | + if (auto result = Drop(ESchemeEntryType::Replication, dbPath, settings); !result.IsSuccess()) { |
| 1188 | + return result; |
| 1189 | + } |
| 1190 | + } |
| 1191 | + |
| 1192 | + // the main loop: tables are restored here |
| 1193 | + for (const auto& [_, i] : tables) { |
| 1194 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1195 | + if (auto result = Drop(type, dbPath, settings); !result.IsSuccess()) { |
| 1196 | + return result; |
| 1197 | + } |
| 1198 | + if (auto result = RestoreTable(fsPath, dbPath, settings, false); !result.IsSuccess()) { |
| 1199 | + return result; |
| 1200 | + } |
| 1201 | + } |
| 1202 | + |
| 1203 | + for (const auto& [dbPath, i] : replications) { |
| 1204 | + const auto& fsPath = backupEntries[i].FsPath; |
| 1205 | + if (auto result = RestoreReplication(fsPath, dbRestoreRoot, dbPath.substr(dbRestoreRoot.size()), settings, false); !result.IsSuccess()) { |
| 1206 | + return result; |
| 1207 | + } |
| 1208 | + } |
| 1209 | + |
| 1210 | + for (size_t i : views) { |
| 1211 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1212 | + Y_ENSURE(dbPath.StartsWith(dbRestoreRoot), "dbPath must be built by appending a relative path to dbRestoreRoot"); |
| 1213 | + // views might depend on other views, so we restore them with the help of a dedicated manager |
| 1214 | + DelayedRestoreManager.Add(type, fsPath, dbRestoreRoot, dbPath.substr(dbRestoreRoot.size()), settings, false); |
| 1215 | + } |
| 1216 | + |
| 1217 | + return Result<TRestoreResult>(); |
| 1218 | +} |
| 1219 | + |
| 1220 | +TRestoreResult TRestoreClient::DropAndRestore(const TFsPath& fsBackupRoot, const TString& dbRestoreRoot, const TRestoreSettings& settings, const THashMap<TString, ESchemeEntryType>& existingEntries) { |
| 1221 | + TVector<TFsBackupEntry> backupEntries; |
| 1222 | + if (auto result = ListBackupEntries(fsBackupRoot, dbRestoreRoot, backupEntries); !result.IsSuccess()) { |
| 1223 | + return result; |
| 1224 | + } |
| 1225 | + LOG_D("List of entries in the backup: " << NJson::WriteJson(ConvertToJson(backupEntries), false)); |
| 1226 | + |
| 1227 | + for (const auto& [fsPath, dbPath, type] : backupEntries) { |
| 1228 | + const auto* existingType = existingEntries.FindPtr(dbPath); |
| 1229 | + |
| 1230 | + // verify that types are matching |
| 1231 | + if (existingType && !TypesAreMatching(*existingType, type)) { |
| 1232 | + return Result<TRestoreResult>(fsPath, EStatus::BAD_REQUEST, |
| 1233 | + TStringBuilder() << "Type mismatch: " << dbPath.Quote() << " already exists and has " << *existingType << " type." |
| 1234 | + " It cannot be replaced with " << type << " from the backup." |
| 1235 | + ); |
| 1236 | + } |
| 1237 | + |
| 1238 | + // verify existence |
| 1239 | + if (!existingType && settings.VerifyExistence_) { |
| 1240 | + return Result<TRestoreResult>(fsPath, EStatus::BAD_REQUEST, |
| 1241 | + TStringBuilder() << "Object is present in the backup but is missing from the database" |
| 1242 | + ); |
| 1243 | + } |
| 1244 | + } |
| 1245 | + |
| 1246 | + TVector<size_t> directories; |
| 1247 | + THashMap<TString, size_t> tables; |
| 1248 | + TVector<size_t> views; |
| 1249 | + THashMap<TString, size_t> replications; |
| 1250 | + TVector<size_t> externalDataSources; |
| 1251 | + THashMap<TString, size_t> externalTables; |
| 1252 | + |
| 1253 | + // scheme entries that do not require special handling (i.e. cannot have dependents) |
| 1254 | + TVector<size_t> regular; |
| 1255 | + |
| 1256 | + for (size_t i = 0; i < backupEntries.size(); ++i) { |
| 1257 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1258 | + switch (type) { |
| 1259 | + case ESchemeEntryType::Directory: |
| 1260 | + directories.emplace_back(i); |
| 1261 | + break; |
| 1262 | + case ESchemeEntryType::ExternalTable: |
| 1263 | + externalTables.emplace(dbPath, i); |
| 1264 | + break; |
| 1265 | + case ESchemeEntryType::ExternalDataSource: |
| 1266 | + externalDataSources.emplace_back(i); |
| 1267 | + break; |
| 1268 | + case ESchemeEntryType::Table: |
| 1269 | + tables.emplace(dbPath, i); |
| 1270 | + break; |
| 1271 | + case ESchemeEntryType::Replication: |
| 1272 | + replications.emplace(dbPath, i); |
| 1273 | + break; |
| 1274 | + case ESchemeEntryType::View: |
| 1275 | + views.emplace_back(i); |
| 1276 | + break; |
| 1277 | + default: |
| 1278 | + regular.emplace_back(i); |
| 1279 | + break; |
| 1280 | + } |
| 1281 | + } |
| 1282 | + |
| 1283 | + for (size_t i : directories) { |
| 1284 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1285 | + if (!existingEntries.contains(dbPath)) { |
| 1286 | + if (auto result = RestoreEmptyDir(fsPath, dbPath, settings, false); !result.IsSuccess()) { |
| 1287 | + return result; |
| 1288 | + } |
| 1289 | + } |
| 1290 | + } |
| 1291 | + |
| 1292 | + if (auto result = DropAndRestoreExternals(backupEntries, externalDataSources, externalTables, settings); !result.IsSuccess()) { |
| 1293 | + return result; |
| 1294 | + } |
| 1295 | + if (auto result = DropAndRestoreTablesAndDependents(backupEntries, tables, views, replications, dbRestoreRoot, settings, existingEntries); !result.IsSuccess()) { |
| 1296 | + return result; |
| 1297 | + } |
| 1298 | + |
| 1299 | + for (size_t i : regular) { |
| 1300 | + const auto& [fsPath, dbPath, type] = backupEntries[i]; |
| 1301 | + if (auto result = Drop(type, dbPath, settings); !result.IsSuccess()) { |
| 1302 | + return result; |
| 1303 | + } |
| 1304 | + Y_ENSURE(dbPath.StartsWith(dbRestoreRoot), "dbPath must be built by appending a relative path to dbRestoreRoot"); |
| 1305 | + if (auto result = Restore(type, fsPath, dbRestoreRoot, dbPath.substr(dbRestoreRoot.size()), settings, false, false); !result.IsSuccess()) { |
| 1306 | + return result; |
| 1307 | + } |
| 1308 | + } |
| 1309 | + |
| 1310 | + return Result<TRestoreResult>(); |
| 1311 | +} |
| 1312 | + |
1050 | 1313 | TRestoreResult TRestoreClient::RestoreView( |
1051 | 1314 | const TFsPath& fsPath, |
1052 | 1315 | const TString& dbRestoreRoot, |
|
0 commit comments