Skip to content

Commit 245573f

Browse files
committed
WIP
1 parent ce07470 commit 245573f

File tree

1 file changed

+120
-19
lines changed

1 file changed

+120
-19
lines changed

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 120 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,44 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
230230
return "";
231231
}
232232

233+
struct TCdcMetadata {
234+
bool IsDelete;
235+
TVector<ui32> UpdatedColumns;
236+
TVector<ui32> ErasedColumns;
237+
};
238+
239+
TCdcMetadata ParseCdcMetadata(const TString& bytesValue) {
240+
TCdcMetadata result;
241+
result.IsDelete = false;
242+
243+
// The bytes contain protobuf-encoded CDC metadata
244+
// For Update mode CDC:
245+
// - Updates have \020\000 (indicating value columns present)
246+
// - Deletes have \020\001 (indicating erase operation)
247+
248+
if (bytesValue.find("\020\001") != TString::npos) {
249+
result.IsDelete = true;
250+
}
251+
252+
// Parse column tags from the metadata
253+
// Format: \010<tag>\020<flags>
254+
for (size_t i = 0; i < bytesValue.size(); ++i) {
255+
if (bytesValue[i] == '\010' && i + 1 < bytesValue.size()) {
256+
ui32 tag = static_cast<ui8>(bytesValue[i + 1]);
257+
if (i + 2 < bytesValue.size() && bytesValue[i + 2] == '\020') {
258+
ui8 flags = i + 3 < bytesValue.size() ? static_cast<ui8>(bytesValue[i + 3]) : 0;
259+
if (flags & 1) {
260+
result.ErasedColumns.push_back(tag);
261+
} else {
262+
result.UpdatedColumns.push_back(tag);
263+
}
264+
}
265+
}
266+
}
267+
268+
return result;
269+
}
270+
233271
NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector<ui32>& tags = {2}) {
234272
auto keyCell = TCell::Make<ui32>(key);
235273
auto valueCell = TCell::Make<ui32>(value);
@@ -2667,13 +2705,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
26672705

26682706
Cerr << "CDC_DEBUG: Index backup result: " << indexBackup << Endl;
26692707

2670-
// Debug: Try to understand why index backup is empty by checking CDC stream state
2671-
auto mainCdcStreamName = FindIncrementalBackupDir(runtime, edgeActor, "/Root/Table");
2672-
Cerr << "CDC_DEBUG: Main table CDC stream name: " << mainCdcStreamName << Endl;
2673-
2674-
auto indexCdcStreamName = FindIncrementalBackupDir(runtime, edgeActor, "/Root/Table/ByValue/indexImplTable");
2675-
Cerr << "CDC_DEBUG: Index table CDC stream name: " << indexCdcStreamName << Endl;
2676-
26772708
// Index should contain changes:
26782709
// - (200, 2) - old value deleted due to update
26792710
// - (250, 2) - new value from update
@@ -2736,6 +2767,11 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
27362767
// Wait for CDC streams to be fully activated on all tables (including index tables)
27372768
SimulateSleep(server, TDuration::Seconds(1));
27382769

2770+
// Debug: Capture index implementation table state after full backup (should be empty)
2771+
auto indexImplTableInitial = KqpSimpleExec(runtime, R"(
2772+
SELECT * FROM `/Root/Table/ByAge/indexImplTable`
2773+
)");
2774+
27392775
// Insert initial data
27402776
ExecSQL(server, edgeActor, R"(
27412777
UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES
@@ -2744,6 +2780,11 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
27442780
;
27452781
)");
27462782

2783+
// Debug: Capture index implementation table after initial insert
2784+
auto indexImplTableAfterInsert = KqpSimpleExec(runtime, R"(
2785+
SELECT * FROM `/Root/Table/ByAge/indexImplTable`
2786+
)");
2787+
27472788
// Update covered column: name changes (should appear in index)
27482789
ExecSQL(server, edgeActor, R"(
27492790
UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 5000u);
@@ -2789,24 +2830,84 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
27892830
SELECT * FROM `)" << indexBackupPath << R"(`
27902831
)");
27912832

2792-
Cerr << "Index backup with covering: " << indexBackup << Endl;
2833+
// Debug: Check actual index implementation table
2834+
auto indexImplTableFinal = KqpSimpleExec(runtime, R"(
2835+
SELECT * FROM `/Root/Table/ByAge/indexImplTable`
2836+
)");
2837+
2838+
Cerr << "=== DEBUG: Index Tables State ===" << Endl;
2839+
Cerr << "1. After full backup (should be empty): " << indexImplTableInitial << Endl;
2840+
Cerr << "2. After initial insert (Alice, Bob): " << indexImplTableAfterInsert << Endl;
2841+
Cerr << "3. Final state (physical table): " << indexImplTableFinal << Endl;
2842+
Cerr << "4. Incremental backup (CDC captured): " << indexBackup << Endl;
2843+
Cerr << "=================================" << Endl;
27932844

2794-
// Index should contain:
2795-
// - Entry for (30, 1) with name changes (Alice -> Alice2)
2796-
// - Tombstone for old (25, 2)
2797-
// - Entry for (26, 2)
2798-
// - Tombstone for (26, 2) after deletion
2845+
// Incremental backup should contain (with CDC compaction):
2846+
// - INSERT for (30, 1) with "Alice2" (final state after name update)
2847+
// - Tombstone (DELETE) for (25, 2) - Bob's old age entry, compacted with initial INSERT
2848+
// - Tombstone (DELETE) for (26, 2) - Bob's new age entry, compacted with INSERT from age change
2849+
//
2850+
// Note: Bob's name won't appear because:
2851+
// - Initial INSERT (25, 2, "Bob") was compacted with DELETE (25, 2) → just tombstone
2852+
// - Age change INSERT (26, 2, "Bob") was compacted with DELETE (26, 2) → just tombstone
2853+
//
27992854
// The salary change should not create a separate index entry since salary is not indexed or covered
28002855

28012856
UNIT_ASSERT_C(indexBackup.find("uint32_value: 30") != TString::npos,
28022857
"Index backup should contain age 30");
28032858
UNIT_ASSERT_C(indexBackup.find("Alice") != TString::npos,
2804-
"Index backup should contain covered column name changes");
2805-
UNIT_ASSERT_C(indexBackup.find("uint32_value: 25") != TString::npos ||
2806-
indexBackup.find("uint32_value: 26") != TString::npos,
2807-
"Index backup should contain age changes (25 or 26)");
2808-
UNIT_ASSERT_C(indexBackup.find("Bob") != TString::npos,
2809-
"Index backup should contain covered column Bob");
2859+
"Index backup should contain Alice2 from covering column name update");
2860+
UNIT_ASSERT_C(indexBackup.find("uint32_value: 25") != TString::npos,
2861+
"Index backup should contain tombstone for age 25");
2862+
UNIT_ASSERT_C(indexBackup.find("uint32_value: 26") != TString::npos,
2863+
"Index backup should contain tombstone for age 26");
2864+
2865+
// Verify tombstones have NULL for covering column (correct behavior for DELETEs)
2866+
// and INSERT has the actual covering column value
2867+
UNIT_ASSERT_C(indexBackup.find("null_flag_value: NULL_VALUE") != TString::npos,
2868+
"Index backup tombstones should have NULL for covering columns");
2869+
2870+
// Parse and verify CDC metadata
2871+
// The backup contains 3 records, let's verify their metadata
2872+
size_t pos = 0;
2873+
int deleteCount = 0;
2874+
int insertCount = 0;
2875+
2876+
while ((pos = indexBackup.find("bytes_value: \"", pos)) != TString::npos) {
2877+
pos += 14; // Skip 'bytes_value: "'
2878+
size_t endPos = indexBackup.find("\"", pos);
2879+
if (endPos == TString::npos) break;
2880+
2881+
TString metadataStr = indexBackup.substr(pos, endPos - pos);
2882+
// Unescape the string
2883+
TString unescaped;
2884+
for (size_t i = 0; i < metadataStr.size(); ++i) {
2885+
if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) {
2886+
// Octal escape \nnn
2887+
ui8 val = ((metadataStr[i+1] - '0') << 6) |
2888+
((metadataStr[i+2] - '0') << 3) |
2889+
(metadataStr[i+3] - '0');
2890+
unescaped += static_cast<char>(val);
2891+
i += 3;
2892+
} else {
2893+
unescaped += metadataStr[i];
2894+
}
2895+
}
2896+
2897+
auto metadata = ParseCdcMetadata(unescaped);
2898+
if (metadata.IsDelete) {
2899+
deleteCount++;
2900+
} else {
2901+
insertCount++;
2902+
}
2903+
2904+
pos = endPos + 1;
2905+
}
2906+
2907+
Cerr << "CDC metadata: " << deleteCount << " DELETEs, " << insertCount << " INSERTs" << Endl;
2908+
2909+
UNIT_ASSERT_EQUAL_C(deleteCount, 2, "Should have 2 DELETE operations (tombstones for age 25 and 26)");
2910+
UNIT_ASSERT_EQUAL_C(insertCount, 1, "Should have 1 INSERT operation (for Alice2)");
28102911
}
28112912

28122913
Y_UNIT_TEST(IncrementalBackupMultipleIndexes) {

0 commit comments

Comments
 (0)