-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/incr backup/indexes support 003 #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
245573f to
636f707
Compare
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds "omit_indexes" backup collection setting to control index inclusion in incremental backups. Propagates OmitIndexes flag through KQP components, proto definitions, and schemeshard operations. Introduces CDC stream creation and handling for index metadata during incremental backups, extending transaction state tracking for CDC path IDs. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant KQP as KQP Provider
participant Gateway as KQP Gateway
participant Proto as Proto/Config
participant Schemeshard as Schemeshard
User->>KQP: Create backup with omit_indexes setting
KQP->>KQP: ParseBackupCollectionSettings<br/>(parses omit_indexes → bool)
KQP->>Gateway: Propagate OmitIndexes to<br/>TBackupCollectionSettings
Gateway->>Proto: Set OmitIndexes on<br/>IncrementalBackupConfig
Gateway->>Schemeshard: Create backup collection
alt omitIndexes == false (include indexes)
Schemeshard->>Schemeshard: For each table entry
Schemeshard->>Schemeshard: Iterate over global indexes
Schemeshard->>Schemeshard: Create CDC stream on<br/>index impl table
Schemeshard->>Schemeshard: Create backup path under<br/>__ydb_backup_meta/indexes/
else omitIndexes == true (skip indexes)
Schemeshard->>Schemeshard: Skip index CDC streams
end
Schemeshard->>User: Backup collection created
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 inconclusive)
✨ Finishing touches
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (8)
ydb/core/tx/schemeshard/ut_base/ut_base.cpp (2)
11922-11935: Harden assertions and avoid brittle char lifetime for backupDirName*
- ChildrenCount(WithIncremental ? 3 : 2) is fine, but it won’t catch a wrong name. When WithIncremental is true, also assert __ydb_backup_meta exists to make the test resilient.
- backupDirName is a const char* from .c_str(); keep an owning TString to avoid lifetime surprises.
Apply:
- auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription(); + auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription(); UNIT_ASSERT_VALUES_EQUAL(descr.GetChildren().size(), 1); - auto backupDirName = descr.GetChildren(0).GetName().c_str(); + const TString backupDirName = descr.GetChildren(0).GetName(); @@ - TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName)), { + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName.c_str())), { NLs::PathExist, NLs::ChildrenCount(WithIncremental ? 3 : 2), NLs::Finished, }); + + if (WithIncremental) { + TestDescribeResult( + DescribePath(runtime, + Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/__ydb_backup_meta", backupDirName.c_str())), + { NLs::PathExist, NLs::Finished }); + }
11989-12005: Do the same for incremental dir: check __ydb_backup_meta and own the name
- Likewise, verify __ydb_backup_meta presence under the incremental backup directory.
- Store incrBackupDirName as TString instead of const char* from .c_str().
Apply:
- auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription(); + auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription(); UNIT_ASSERT_VALUES_EQUAL(descr.GetChildren().size(), 2); - const char* incrBackupDirName = nullptr; + TString incrBackupDirName; for (auto& dir : descr.GetChildren()) { if (dir.GetName().EndsWith("_incremental")) { - incrBackupDirName = dir.GetName().c_str(); + incrBackupDirName = dir.GetName(); } } @@ - TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", incrBackupDirName)), { + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", incrBackupDirName.c_str())), { NLs::PathExist, NLs::ChildrenCount(3), NLs::Finished, }); + + TestDescribeResult( + DescribePath(runtime, + Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/__ydb_backup_meta", incrBackupDirName.c_str())), + { NLs::PathExist, NLs::Finished });ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp (1)
219-236: Consider documenting the first-match behavior.The function returns only the first directory ending with
_incremental. In scenarios with multiple incremental backups, this could be ambiguous. Consider either:
- Adding a comment explaining this limitation
- Adding a parameter to specify which incremental backup to find (e.g., by timestamp)
- Ensuring tests only create one incremental backup when using this helper
ydb/core/kqp/provider/yql_kikimr_type_ann.cpp (1)
2447-2453: Recognizing omit_indexes looks good; consider early validation.You now accept "omit_indexes". Consider surfacing an error if omit_indexes is specified while incremental_backup_enabled is false (either here or in ParseBackupCollectionSettings), to fail fast instead of silently ignoring it downstream.
Can you confirm ParseBackupCollectionSettings rejects this inconsistent combo today?
ydb/core/kqp/provider/yql_kikimr_gateway.h (1)
1055-1057: Defaults are sensible; consider tri-state if semantics matter.If you later need to differentiate “not provided” vs “explicit false”, switch OmitIndexes (and maybe IncrementalBackupEnabled) to TMaybe. Otherwise this is fine.
ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp (1)
639-645: Consider using a more robust mechanism than string suffix check.The condition
!streamName.EndsWith("_continuousBackupImpl")relies on a naming convention to identify continuous backup streams. This is fragile and could silently break if the naming convention changes.Consider one of these alternatives:
- Add a flag or attribute to the CDC stream metadata indicating it's a continuous backup stream
- Define a constant for the suffix and use it consistently across the codebase
- Use an enum or type field to explicitly mark continuous backup streams
Current pattern location:
This same pattern appears inydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cppat lines 589 and 639, suggesting the need for a centralized approach.ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp (1)
587-595: Same string suffix fragility as in alter operation.The loop correctly handles multiple streams, but still relies on the naming convention
"_continuousBackupImpl"to identify continuous backup streams. This has the same maintainability concerns noted inschemeshard__operation_alter_cdc_stream.cpp.Apply the same refactoring suggested for the alter operation:
- Use a metadata flag/attribute instead of string matching
- Or define a shared constant for the suffix across all CDC operations
This pattern appears in at least three locations:
schemeshard__operation_alter_cdc_stream.cppline 639- This file line 589
schemeshard__operation_common_cdc_stream.cppline 139Consider consolidating this check into a helper function.
ydb/core/tx/schemeshard/schemeshard_impl.cpp (1)
2642-2656: Deduplicate CDC extraData serializationThe proto build/serialize block repeats here and for TxRotateCdcStreamAtTable. Extract a tiny helper/lambda to reduce duplication and keep future CDC tx additions consistent.
Example:
auto writeCdcExtra = [&](TString& out) { NKikimrSchemeOp::TGenericTxInFlyExtraData proto; txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); bool ok = proto.SerializeToString(&out); Y_ABORT_UNLESS(ok); }; // use in both branches writeCdcExtra(extraData);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
ydb/core/kqp/host/kqp_gateway_proxy.cpp(1 hunks)ydb/core/kqp/provider/yql_kikimr_exec.cpp(1 hunks)ydb/core/kqp/provider/yql_kikimr_gateway.h(1 hunks)ydb/core/kqp/provider/yql_kikimr_type_ann.cpp(1 hunks)ydb/core/protos/flat_scheme_op.proto(1 hunks)ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp(3 hunks)ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp(1 hunks)ydb/core/tx/schemeshard/schemeshard__init.cpp(1 hunks)ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp(2 hunks)ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp(3 hunks)ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp(1 hunks)ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp(1 hunks)ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp(3 hunks)ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp(2 hunks)ydb/core/tx/schemeshard/schemeshard_impl.cpp(1 hunks)ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp(1 hunks)ydb/core/tx/schemeshard/ut_base/ut_base.cpp(2 hunks)
🔇 Additional comments (18)
ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp (5)
1703-1798: LGTM! Comprehensive test for CDC stream creation on indexes.The test properly verifies that CDC streams are created for both the main table and index implementation table during incremental backups, and validates that they share the same timestamp suffix.
1800-1897: LGTM! Good coverage for multiple indexes.Properly validates that all index CDC streams share the same timestamp, ensuring consistency across multiple indexes.
1899-1999: LGTM! Critical test for index type differentiation.Correctly validates that CDC streams are created only for global sync indexes and not for async indexes, which is the expected behavior for incremental backups.
2109-2232: LGTM! Comprehensive end-to-end test.This test validates the complete incremental backup flow with indexes, including CDC stream creation, backup execution, and verification of the backup directory structure with index metadata at the expected path.
2234-2314: LGTM! Validates OmitIndexes flag behavior.Correctly tests that when
OmitIndexes: trueis set in IncrementalBackupConfig, the main table still receives CDC streams but the indexes do not, as expected.ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp (4)
42-45: LGTM! Simple helper struct.Clean data structure for tracking CDC operation counts.
312-348: LGTM! Clean refactor to support multiple tags.The modification to iterate over the tags vector is straightforward and correct, enabling more flexible test scenarios.
2620-2813: Tests rely on brittle helper functions.These tests (IncrementalBackupWithIndexes and IncrementalBackupWithCoveringIndex) use
FindIncrementalBackupDirandCountCdcOperations, which have robustness issues flagged in earlier comments. While the test logic itself is sound, consider making the helper functions more robust to improve test reliability.Additionally, the 10-second sleep at line 2682 could be reduced if possible to speed up test execution.
2815-3013: LGTM with dependencies on helper function improvements.The test coverage for multiple indexes and the OmitIndexes flag is comprehensive. However, these tests depend on the helper functions (
FindIncrementalBackupDir,CountCdcOperations) that have brittleness issues noted in earlier comments.ydb/core/kqp/host/kqp_gateway_proxy.cpp (1)
1370-1373: Correct propagation of OmitIndexes into IncrementalBackupConfig.Gated by IncrementalBackupEnabled and writes the flag as intended. LGTM.
Please add/confirm tests for:
- incremental enabled + omit_indexes=true → indexes omitted
- incremental enabled + omit_indexes=false → indexes included
- incremental disabled (regardless of omit_indexes) → no IncrementalBackupConfig emitted
Example repo scan to verify coverage:
ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp (1)
462-462: LGTM! CDC path tracking added consistently.The assignment of
CdcPathIdenables downstream CDC operation handling and aligns with the broader pattern of CDC path propagation introduced across related operations.ydb/core/protos/flat_scheme_op.proto (1)
2314-2316: LGTM! Proto field addition follows conventions.The
OmitIndexesfield is correctly defined with an appropriate default value offalseto maintain backward compatibility. This enables control over index inclusion in incremental backups.ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp (1)
194-194: LGTM! CDC path tracking consistent with related operations.The assignment enables CDC path tracking for drop operations, maintaining consistency with the alter CDC stream implementation.
ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp (1)
131-177: Verify the correctness of index-table version synchronization.This code introduces synchronization of the parent index's
AlterVersionwith the impl table'sAlterVersionfor continuous backup streams. Several concerns need verification:
Version semantics: Direct assignment
index->AlterVersion = table->AlterVersion(line 159) assumes these versions can be safely synchronized. If index and table versions have independent semantics, this could cause consistency issues.TPathId boolean check: Line 153 uses
if (parentPathId && ...)which assumesTPathIdsupports boolean conversion. Verify this is the correct way to check validity.Selective synchronization: This only happens for continuous backup operations. Could this cause version drift in other scenarios where the index and impl table should remain synchronized?
Missing validation: No checks verify that:
- The versions are not already in sync
- The synchronization doesn't violate any invariants
- The operation is idempotent
Please verify:
- Is direct version synchronization between index and impl table semantically correct?
- Are there other scenarios (besides continuous backup) where this synchronization should occur?
- Should there be guards against synchronizing already-synced versions or validation of version compatibility?
Consider adding documentation explaining why this synchronization is necessary and safe.
ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp (1)
133-190: Index backup path collection looks correct, but verify OmitIndexes precedence.The logic to collect index backup paths for incremental backups is well-structured. However, the OmitIndexes check at lines 139-140 warrants clarification:
bool omitIndexes = bc->Description.GetOmitIndexes() || (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes());This uses OR logic, meaning indexes are omitted if either:
- The top-level
OmitIndexesfield is true, OR- The incremental backup config's
OmitIndexesfield is truePlease confirm this is the intended precedence:
- Should the top-level setting override the incremental config setting?
- Or should they be independent with OR semantics as currently implemented?
- Consider documenting this precedence in code comments or design docs
The rest of the logic is sound:
- Proper error handling with
continueon failures (lines 150, 156)- Correct filtering for global, non-dropped indexes (lines 164-176)
- Appropriate path construction for metadata (lines 181-187)
ydb/core/kqp/provider/yql_kikimr_exec.cpp (1)
1024-1030: LGTM! Clean implementation following established patterns.The omit_indexes setting parsing mirrors the incremental_backup_enabled implementation above it, with appropriate boolean validation and clear error messaging. The code correctly extracts the value, validates it as a boolean, and stores it in the settings structure.
ydb/core/tx/schemeshard/schemeshard__init.cpp (1)
3676-3687: CDC extraData parse: add guard and consider DRY helperGood addition. Two tweaks:
- Only set CdcPathId when present to avoid writing InvalidPathId defaults.
- Optional: extract a small helper to parse/set CdcPathId and reuse with TxRotateCdcStreamAtTable.
Apply within this hunk:
- if (!extraData.empty()) { - NKikimrSchemeOp::TGenericTxInFlyExtraData proto; - bool deserializeRes = ParseFromStringNoSizeLimit(proto, extraData); - Y_ABORT_UNLESS(deserializeRes); - txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); - } + if (!extraData.empty()) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + const bool ok = ParseFromStringNoSizeLimit(proto, extraData); + Y_ABORT_UNLESS(ok); + const auto& pid = proto.GetTxCopyTableExtraData().GetCdcPathId(); + if (pid.GetOwnerId() || pid.GetLocalId()) { + txState.CdcPathId = TPathId::FromProto(pid); + } + }To confirm end-to-end consistency, please run:
ydb/core/tx/schemeshard/schemeshard_impl.cpp (1)
2646-2656: PersistTxState: CDC-at-table extraData persistence looks correctMatches TxRotateCdcStreamAtTable pattern; CdcPathId is serialized into GenericTxInFlyExtraData. Good extension.
Please verify the init/deserialization path handles these new tx types and CdcPathId, and that CdcPathId is always set before persisting. Suggested repo scan:
| TCdcMetadata ParseCdcMetadata(const TString& bytesValue) { | ||
| TCdcMetadata result; | ||
| result.IsDelete = false; | ||
|
|
||
| // The bytes contain protobuf-encoded CDC metadata | ||
| // For Update mode CDC: | ||
| // - Updates have \020\000 (indicating value columns present) | ||
| // - Deletes have \020\001 (indicating erase operation) | ||
|
|
||
| if (bytesValue.find("\020\001") != TString::npos) { | ||
| result.IsDelete = true; | ||
| } | ||
|
|
||
| // Parse column tags from the metadata | ||
| // Format: \010<tag>\020<flags> | ||
| for (size_t i = 0; i < bytesValue.size(); ++i) { | ||
| if (bytesValue[i] == '\010' && i + 1 < bytesValue.size()) { | ||
| ui32 tag = static_cast<ui8>(bytesValue[i + 1]); | ||
| if (i + 2 < bytesValue.size() && bytesValue[i + 2] == '\020') { | ||
| ui8 flags = i + 3 < bytesValue.size() ? static_cast<ui8>(bytesValue[i + 3]) : 0; | ||
| if (flags & 1) { | ||
| result.ErasedColumns.push_back(tag); | ||
| } else { | ||
| result.UpdatedColumns.push_back(tag); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return result; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brittle CDC metadata parsing using hardcoded byte patterns.
The function parses CDC metadata by searching for hardcoded byte sequences (\020\001, \010, etc.) instead of using proper protobuf deserialization. This approach is fragile because:
- It assumes a specific protobuf encoding that could change
- Manual byte inspection is error-prone
- Changes to the CDC metadata protobuf structure will silently break this parser
Consider using proper protobuf deserialization instead:
TCdcMetadata ParseCdcMetadata(const TString& bytesValue) {
TCdcMetadata result;
NKikimrBackup::TChangeMetadata metadata;
if (!metadata.ParseFromString(bytesValue)) {
// Handle parse error
return result;
}
result.IsDelete = metadata.GetIsDeleted();
for (const auto& colState : metadata.GetColumnStates()) {
if (colState.GetIsChanged()) {
if (colState.GetIsNull()) {
result.ErasedColumns.push_back(colState.GetTag());
} else {
result.UpdatedColumns.push_back(colState.GetTag());
}
}
}
return result;
}🤖 Prompt for AI Agents
ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp lines 244-274: current
parsing inspects hardcoded byte sequences ("\020\001", "\010", etc.) which is
brittle; replace it with proper protobuf deserialization using
NKikimrBackup::TChangeMetadata::ParseFromString, check the return value and
handle parse failure, set result.IsDelete from metadata.GetIsDeleted() (or
equivalent field), iterate metadata.GetColumnStates() and use
colState.GetIsChanged()/GetIsNull() (or equivalent) to populate
result.ErasedColumns and result.UpdatedColumns, and ensure any parse error is
handled (return default result or log) rather than relying on manual byte
scanning.
| CdcOperationCounts CountCdcOperations(const TString& backup) { | ||
| CdcOperationCounts counts; | ||
| size_t pos = 0; | ||
|
|
||
| while ((pos = backup.find("bytes_value: \"", pos)) != TString::npos) { | ||
| pos += 14; | ||
| size_t endPos = backup.find("\"", pos); | ||
| if (endPos == TString::npos) break; | ||
|
|
||
| TString metadataStr = backup.substr(pos, endPos - pos); | ||
| TString unescaped; | ||
| for (size_t i = 0; i < metadataStr.size(); ++i) { | ||
| if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) { | ||
| ui8 val = ((metadataStr[i+1] - '0') << 6) | | ||
| ((metadataStr[i+2] - '0') << 3) | | ||
| (metadataStr[i+3] - '0'); | ||
| unescaped += static_cast<char>(val); | ||
| i += 3; | ||
| } else { | ||
| unescaped += metadataStr[i]; | ||
| } | ||
| } | ||
|
|
||
| auto metadata = ParseCdcMetadata(unescaped); | ||
| if (metadata.IsDelete) { | ||
| counts.Deletes++; | ||
| } else { | ||
| counts.Inserts++; | ||
| } | ||
|
|
||
| pos = endPos + 1; | ||
| } | ||
|
|
||
| return counts; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brittle text parsing and manual octal decoding.
This function has several fragility issues:
-
String pattern matching: Searches for
"bytes_value: \""which assumes a specific text format of the backup representation -
Manual octal decoding: The bit-shift approach for octal decoding works but is confusing:
ui8 val = ((metadataStr[i+1] - '0') << 6) | ((metadataStr[i+2] - '0') << 3) | (metadataStr[i+3] - '0');
While mathematically correct (since 8² = 2⁶ and 8¹ = 2³), this mixes octal and binary concepts. More readable:
ui8 d1 = metadataStr[i+1] - '0'; ui8 d2 = metadataStr[i+2] - '0'; ui8 d3 = metadataStr[i+3] - '0'; ui8 val = (d1 * 64) + (d2 * 8) + d3; // or: d1 * 8 * 8 + d2 * 8 + d3
-
Depends on ParseCdcMetadata: Which itself is brittle (see previous comment)
Consider using a more robust approach that doesn't rely on string parsing of debug output.
🤖 Prompt for AI Agents
ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp around lines 276-310:
the routine brittlely parses textual backup output and manually decodes octal
escapes; replace this fragile text scanning with a robust approach by extracting
the raw bytes via the serialization/deserialization API used for the backup (or
by parsing the protobuf/binary format) instead of searching for "bytes_value:
\""; remove the manual octal-decoding loop and use a proper unescape/byte-decode
helper (or direct byte array from the deserialized message), then feed that raw
data to ParseCdcMetadata (or refactor ParseCdcMetadata to accept binary) so
inserts/deletes are counted from structured data rather than fragile string
patterns.
| // Get index implementation table (the only child of index) | ||
| auto indexPath = TPath::Init(childPathId, context.SS); | ||
| Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); | ||
| auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); | ||
|
|
||
| auto indexTablePath = indexPath.Child(implTableName); | ||
| auto indexTable = context.SS->Tables.at(implTablePathId); | ||
|
|
||
| // Create CDC stream on index impl table | ||
| NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; | ||
| createCdcStreamOp.SetTableName(implTableName); | ||
| auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); | ||
| streamDescription.SetName(streamName); | ||
| streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); | ||
| streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); | ||
|
|
||
| NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, indexTablePath, false, false); | ||
|
|
||
| // Create AtTable operation to notify datashard (without schema change) | ||
| { | ||
| auto outTx = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); | ||
| auto& cdcOp = *outTx.MutableCreateCdcStream(); | ||
| cdcOp.CopyFrom(createCdcStreamOp); | ||
| result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, false)); | ||
| } | ||
|
|
||
| // Create PQ part for index CDC stream | ||
| TVector<TString> boundaries; | ||
| const auto& partitions = indexTable->GetPartitions(); | ||
| boundaries.reserve(partitions.size() - 1); | ||
| for (ui32 i = 0; i < partitions.size(); ++i) { | ||
| const auto& partition = partitions.at(i); | ||
| if (i != partitions.size() - 1) { | ||
| boundaries.push_back(partition.EndOfRange); | ||
| } | ||
| } | ||
|
|
||
| const auto streamPath = indexTablePath.Child(streamName); | ||
| NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, indexTable, boundaries, false); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip indexes whose impl tables are not ready
Here too we unconditionally Y_ABORT_UNLESS the index has exactly one child. Any index that is still building, being altered/dropped, or exposes multiple implementation children (vector indexes today) will make this abort trigger and bring SchemeShard down when a backup collection with omitIndexes=false runs. Please replace the abort with a guarded branch that skips the index (or surfaces a controlled reject) and ensure the implementation table entry exists before dereferencing.
Apply this diff to mirror the incremental path fix:
- auto indexPath = TPath::Init(childPathId, context.SS);
- Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1);
- auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin();
+ auto indexPath = TPath::Init(childPathId, context.SS);
+ const auto& children = indexPath.Base()->GetChildren();
+ if (children.size() != 1) {
+ continue;
+ }
+ auto [implTableName, implTablePathId] = *children.begin();
+ if (!context.SS->Tables.contains(implTablePathId)) {
+ continue;
+ }🤖 Prompt for AI Agents
In ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp
around lines 198 to 237, replace the unconditional Y_ABORT_UNLESS that assumes
exactly one child for the index with a guarded branch: if the index base
children count is not exactly 1, skip this index (or add a controlled reject
result) instead of aborting; when a single child exists, extract implTableName
and implTablePathId but first check that context.SS->Tables contains
implTablePathId before dereferencing, and if the table entry is missing skip the
index (or surface a controlled error) so we never crash on indexes still
building/altering or multi-impl indexes.
| Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); | ||
| auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); | ||
|
|
||
| // Build relative path to index impl table (relative to working dir) | ||
| TString indexImplTableRelPath = JoinPath({relativeItemPath, childName, implTableName}); | ||
|
|
||
| // Create AlterContinuousBackup for index impl table | ||
| NKikimrSchemeOp::TModifyScheme modifyScheme; | ||
| modifyScheme.SetWorkingDir(tx.GetWorkingDir()); | ||
| modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterContinuousBackup); | ||
| modifyScheme.SetInternal(true); | ||
|
|
||
| auto& cb = *modifyScheme.MutableAlterContinuousBackup(); | ||
| cb.SetTableName(indexImplTableRelPath); // Relative path: table1/index1/indexImplTable | ||
|
|
||
| auto& ib = *cb.MutableTakeIncrementalBackup(); | ||
| // Destination: {backup_collection}/{timestamp}_inc/__ydb_backup_meta/indexes/{table_path}/{index_name} | ||
| TString dstPath = JoinPath({ | ||
| tx.GetBackupIncrementalBackupCollection().GetName(), | ||
| tx.GetBackupIncrementalBackupCollection().GetTargetDir(), | ||
| "__ydb_backup_meta", | ||
| "indexes", | ||
| relativeItemPath, // Relative table path (e.g., "table1") | ||
| childName // Index name (e.g., "index1") | ||
| }); | ||
| ib.SetDstPath(dstPath); | ||
|
|
||
| TPathId stream; | ||
| if (!CreateAlterContinuousBackup(opId, modifyScheme, context, result, stream)) { | ||
| return result; | ||
| } | ||
| streams.push_back(stream); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid Y_ABORT when index impl table is not ready
Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); will terminate SchemeShard as soon as an index in the collection is still building, altered, or has multiple implementation children (e.g. vector indexes). Incremental backups can be triggered while such indexes are mid-flight, so this path is reachable and the tablet will crash. Please guard this case and skip (or reject) the index instead of aborting, and also verify the impl table actually exists before dereferencing it.
Apply this diff to handle the edge cases:
- auto indexPath = TPath::Init(childPathId, context.SS);
- Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1);
- auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin();
+ auto indexPath = TPath::Init(childPathId, context.SS);
+ const auto& children = indexPath.Base()->GetChildren();
+ if (children.size() != 1) {
+ continue; // implementation table is not ready yet
+ }
+ auto [implTableName, implTablePathId] = *children.begin();
+ if (!context.SS->Tables.contains(implTablePathId)) {
+ continue;
+ }Committable suggestion skipped: line range outside the PR's diff.
Changelog entry
...
Changelog category
Description for reviewers
...