Skip to content

Commit 5169280

Browse files
authored
Merge 01d97f1 into 30aae3f
2 parents 30aae3f + 01d97f1 commit 5169280

File tree

2 files changed

+347
-0
lines changed

2 files changed

+347
-0
lines changed

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ void FillSetValForSequences(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription
108108
}
109109
}
110110

111+
void FillPartitioning(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription& desc, const TPathId& exportItemPathId) {
112+
NKikimrSchemeOp::TDescribeOptions opts;
113+
opts.SetReturnPartitionConfig(true);
114+
opts.SetReturnBoundaries(true);
115+
116+
auto copiedPath = DescribePath(ss, TlsActivationContext->AsActorContext(), exportItemPathId, opts);
117+
const auto& copiedTable = copiedPath->GetRecord().GetPathDescription().GetTable();
118+
119+
*desc.MutableSplitBoundary() = copiedTable.GetSplitBoundary();
120+
*desc.MutablePartitionConfig()->MutablePartitioningPolicy() = copiedTable.GetPartitionConfig().GetPartitioningPolicy();
121+
}
122+
111123
THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
112124
TSchemeShard* ss,
113125
TTxId txId,
@@ -137,6 +149,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
137149
if (sourceDescription.HasTable()) {
138150
FillSetValForSequences(
139151
ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
152+
FillPartitioning(ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
140153
}
141154
task.MutableTable()->CopyFrom(sourceDescription);
142155
}

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,6 +1140,340 @@ value {
11401140
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
11411141
}
11421142

1143+
Y_UNIT_TEST(ShouldRestoreTableWithVolatilePartitioningMerge) {
1144+
TPortManager portManager;
1145+
const ui16 port = portManager.GetPort();
1146+
1147+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
1148+
UNIT_ASSERT(s3Mock.Start());
1149+
1150+
TTestBasicRuntime runtime;
1151+
TTestEnv env(runtime);
1152+
1153+
ui64 txId = 100;
1154+
1155+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
1156+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
1157+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
1158+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
1159+
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);
1160+
1161+
// Create table with 2 tablets
1162+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1163+
Name: "Original"
1164+
Columns { Name: "key" Type: "Uint32" }
1165+
Columns { Name: "value" Type: "Utf8" }
1166+
KeyColumnNames: ["key"]
1167+
PartitionConfig {
1168+
PartitioningPolicy {
1169+
MinPartitionsCount: 2
1170+
MaxPartitionsCount: 2
1171+
}
1172+
}
1173+
SplitBoundary {
1174+
KeyPrefix {
1175+
Tuple { Optional { Uint32: 2 } }
1176+
}
1177+
}
1178+
)");
1179+
env.TestWaitNotification(runtime, txId);
1180+
1181+
// Upload data
1182+
const auto firstTablet = TTestTxConfig::FakeHiveTablets;
1183+
const auto secondTablet = TTestTxConfig::FakeHiveTablets + 1;
1184+
UpdateRow(runtime, "Original", 1, "valueA", firstTablet);
1185+
UpdateRow(runtime, "Original", 2, "valueB", secondTablet);
1186+
1187+
// Add delay after copying tables
1188+
bool dropNotification = false;
1189+
THolder<IEventHandle> delayed;
1190+
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
1191+
switch (ev->GetTypeRewrite()) {
1192+
case TEvSchemeShard::EvModifySchemeTransaction:
1193+
break;
1194+
case TEvSchemeShard::EvNotifyTxCompletionResult:
1195+
if (dropNotification) {
1196+
delayed.Reset(ev.Release());
1197+
return TTestActorRuntime::EEventAction::DROP;
1198+
}
1199+
return TTestActorRuntime::EEventAction::PROCESS;
1200+
default:
1201+
return TTestActorRuntime::EEventAction::PROCESS;
1202+
}
1203+
1204+
const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>();
1205+
if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1206+
dropNotification = true;
1207+
}
1208+
1209+
return TTestActorRuntime::EEventAction::PROCESS;
1210+
});
1211+
1212+
// Start exporting table
1213+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1214+
ExportToS3Settings {
1215+
endpoint: "localhost:%d"
1216+
scheme: HTTP
1217+
items {
1218+
source_path: "/MyRoot/Original"
1219+
destination_prefix: ""
1220+
}
1221+
}
1222+
)", port));
1223+
const ui64 exportId = txId;
1224+
1225+
// Wait for delay after copying tables
1226+
if (!delayed) {
1227+
TDispatchOptions opts;
1228+
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
1229+
return bool(delayed);
1230+
});
1231+
runtime.DispatchEvents(opts);
1232+
}
1233+
runtime.SetObserverFunc(prevObserver);
1234+
1235+
// Merge 2 tablets in 1 during the delay
1236+
TestAlterTable(runtime, ++txId, "/MyRoot", R"(
1237+
Name: "Original"
1238+
PartitionConfig {
1239+
PartitioningPolicy {
1240+
MinPartitionsCount: 1
1241+
MaxPartitionsCount: 1
1242+
}
1243+
}
1244+
)");
1245+
env.TestWaitNotification(runtime, txId);
1246+
1247+
TestSplitTable(runtime, ++txId, "/MyRoot/Original", Sprintf(R"(
1248+
SourceTabletId: %lu
1249+
SourceTabletId: %lu
1250+
)", firstTablet, secondTablet));
1251+
env.TestWaitNotification(runtime, txId);
1252+
1253+
// Finish the delay and continue exporting
1254+
runtime.Send(delayed.Release(), 0, true);
1255+
env.TestWaitNotification(runtime, exportId);
1256+
1257+
// Check export
1258+
TestGetExport(runtime, exportId, "/MyRoot");
1259+
1260+
// Restore table
1261+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1262+
ImportFromS3Settings {
1263+
endpoint: "localhost:%d"
1264+
scheme: HTTP
1265+
items {
1266+
source_prefix: ""
1267+
destination_path: "/MyRoot/Restored"
1268+
}
1269+
}
1270+
)", port));
1271+
const ui64 importId = txId;
1272+
env.TestWaitNotification(runtime, importId);
1273+
1274+
// Check import
1275+
TestGetImport(runtime, importId, "/MyRoot");
1276+
1277+
// Check partitioning in restored table
1278+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Restored", true, true), {
1279+
NLs::MinPartitionsCountEqual(2),
1280+
NLs::MaxPartitionsCountEqual(2),
1281+
NLs::CheckBoundaries
1282+
});
1283+
1284+
// Check data in restored table
1285+
const auto restoredFirstTablet = TTestTxConfig::FakeHiveTablets + 5;
1286+
const auto restoredSecondTablet = TTestTxConfig::FakeHiveTablets + 6;
1287+
{
1288+
auto expectedJson = TStringBuilder() << "[[[["
1289+
<< "["
1290+
<< R"(["1"];)" // key
1291+
<< R"(["valueA"])" // value
1292+
<< "];"
1293+
<< "];\%false]]]";
1294+
auto content = ReadTable(runtime, restoredFirstTablet, "Restored", {"key", "Uint32", "0"});
1295+
NKqp::CompareYson(expectedJson, content);
1296+
}
1297+
{
1298+
auto expectedJson = TStringBuilder() << "[[[["
1299+
<< "["
1300+
<< R"(["2"];)" // key
1301+
<< R"(["valueB"])" // value
1302+
<< "];"
1303+
<< "];\%false]]]";
1304+
auto content = ReadTable(runtime, restoredSecondTablet, "Restored", {"key", "Uint32", "0"});
1305+
NKqp::CompareYson(expectedJson, content);
1306+
}
1307+
}
1308+
1309+
Y_UNIT_TEST(ShouldRestoreTableWithVolatilePartitioningSplit) {
1310+
TPortManager portManager;
1311+
const ui16 port = portManager.GetPort();
1312+
1313+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
1314+
UNIT_ASSERT(s3Mock.Start());
1315+
1316+
TTestBasicRuntime runtime;
1317+
TTestEnv env(runtime);
1318+
1319+
ui64 txId = 100;
1320+
1321+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
1322+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
1323+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
1324+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
1325+
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);
1326+
1327+
// Create table with 2 tablets
1328+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1329+
Name: "Original"
1330+
Columns { Name: "key" Type: "Uint32" }
1331+
Columns { Name: "value" Type: "Utf8" }
1332+
KeyColumnNames: ["key"]
1333+
PartitionConfig {
1334+
PartitioningPolicy {
1335+
MinPartitionsCount: 2
1336+
MaxPartitionsCount: 2
1337+
}
1338+
}
1339+
SplitBoundary {
1340+
KeyPrefix {
1341+
Tuple { Optional { Uint32: 3 } }
1342+
}
1343+
}
1344+
)");
1345+
env.TestWaitNotification(runtime, txId);
1346+
1347+
// Upload data
1348+
const auto firstTablet = TTestTxConfig::FakeHiveTablets;
1349+
UpdateRow(runtime, "Original", 1, "valueA", firstTablet);
1350+
UpdateRow(runtime, "Original", 2, "valueB", firstTablet);
1351+
1352+
// Add delay after copying tables
1353+
bool dropNotification = false;
1354+
THolder<IEventHandle> delayed;
1355+
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
1356+
switch (ev->GetTypeRewrite()) {
1357+
case TEvSchemeShard::EvModifySchemeTransaction:
1358+
break;
1359+
case TEvSchemeShard::EvNotifyTxCompletionResult:
1360+
if (dropNotification) {
1361+
delayed.Reset(ev.Release());
1362+
return TTestActorRuntime::EEventAction::DROP;
1363+
}
1364+
return TTestActorRuntime::EEventAction::PROCESS;
1365+
default:
1366+
return TTestActorRuntime::EEventAction::PROCESS;
1367+
}
1368+
1369+
const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>();
1370+
if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1371+
dropNotification = true;
1372+
}
1373+
1374+
return TTestActorRuntime::EEventAction::PROCESS;
1375+
});
1376+
1377+
// Start exporting table
1378+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1379+
ExportToS3Settings {
1380+
endpoint: "localhost:%d"
1381+
scheme: HTTP
1382+
items {
1383+
source_path: "/MyRoot/Original"
1384+
destination_prefix: ""
1385+
}
1386+
}
1387+
)", port));
1388+
const ui64 exportId = txId;
1389+
1390+
// Wait for delay after copying tables
1391+
if (!delayed) {
1392+
TDispatchOptions opts;
1393+
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
1394+
return bool(delayed);
1395+
});
1396+
runtime.DispatchEvents(opts);
1397+
}
1398+
runtime.SetObserverFunc(prevObserver);
1399+
1400+
// Split 2 tablets in 3 during the delay
1401+
TestAlterTable(runtime, ++txId, "/MyRoot", R"(
1402+
Name: "Original"
1403+
PartitionConfig {
1404+
PartitioningPolicy {
1405+
MinPartitionsCount: 3
1406+
MaxPartitionsCount: 3
1407+
}
1408+
}
1409+
)");
1410+
env.TestWaitNotification(runtime, txId);
1411+
1412+
TestSplitTable(runtime, ++txId, "/MyRoot/Original", Sprintf(R"(
1413+
SourceTabletId: %lu
1414+
SplitBoundary {
1415+
KeyPrefix {
1416+
Tuple { Optional { Uint32: 2 } }
1417+
}
1418+
}
1419+
)", firstTablet));
1420+
env.TestWaitNotification(runtime, txId);
1421+
1422+
// Finish the delay and continue exporting
1423+
runtime.Send(delayed.Release(), 0, true);
1424+
env.TestWaitNotification(runtime, exportId);
1425+
1426+
// Check export
1427+
TestGetExport(runtime, exportId, "/MyRoot");
1428+
1429+
// Restore table
1430+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1431+
ImportFromS3Settings {
1432+
endpoint: "localhost:%d"
1433+
scheme: HTTP
1434+
items {
1435+
source_prefix: ""
1436+
destination_path: "/MyRoot/Restored"
1437+
}
1438+
}
1439+
)", port));
1440+
const ui64 importId = txId;
1441+
env.TestWaitNotification(runtime, importId);
1442+
1443+
// Check import
1444+
TestGetImport(runtime, importId, "/MyRoot");
1445+
1446+
// Check partitioning in restored table
1447+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Restored", true, true), {
1448+
NLs::MinPartitionsCountEqual(2),
1449+
NLs::MaxPartitionsCountEqual(2),
1450+
NLs::CheckBoundaries
1451+
});
1452+
1453+
// Check data in restored table
1454+
const auto restoredFirstTablet = TTestTxConfig::FakeHiveTablets + 6;
1455+
const auto restoredSecondTablet = TTestTxConfig::FakeHiveTablets + 7;
1456+
{
1457+
auto expectedJson = TStringBuilder() << "[[[["
1458+
<< "["
1459+
<< R"(["1"];)" // key
1460+
<< R"(["valueA"])" // value
1461+
<< "];"
1462+
<< "["
1463+
<< R"(["2"];)" // key
1464+
<< R"(["valueB"])" // value
1465+
<< "];"
1466+
<< "];\%false]]]";
1467+
auto content = ReadTable(runtime, restoredFirstTablet, "Restored", {"key", "Uint32", "0"});
1468+
NKqp::CompareYson(expectedJson, content);
1469+
}
1470+
{
1471+
auto expectedJson = "[[[[];\%false]]]";
1472+
auto content = ReadTable(runtime, restoredSecondTablet, "Restored", {"key", "Uint32", "0"});
1473+
NKqp::CompareYson(expectedJson, content);
1474+
}
1475+
}
1476+
11431477
Y_UNIT_TEST(ExportImportOnSupportedDatatypes) {
11441478
TTestBasicRuntime runtime;
11451479
TTestEnv env(runtime, TTestEnvOptions());

0 commit comments

Comments
 (0)