Skip to content
Open

WIP #18

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
885 changes: 885 additions & 0 deletions PARALLEL_OPERATIONS_TESTING_PLAN.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,28 @@ void SyncImplTableVersion(
{
Y_ABORT_UNLESS(context.SS->Tables.contains(versionCtx.GrandParentPathId));
auto parentTable = context.SS->Tables.at(versionCtx.GrandParentPathId);

ui64 currentImplVersion = table->AlterVersion;
ui64 currentParentVersion = parentTable->AlterVersion;

if (currentImplVersion <= currentParentVersion) {
table->AlterVersion = currentParentVersion;
// Also check the index entity version to avoid race conditions
// Use the maximum of parent version and index entity version
ui64 targetVersion = currentParentVersion;
if (context.SS->Indexes.contains(versionCtx.ParentPathId)) {
auto index = context.SS->Indexes.at(versionCtx.ParentPathId);
// This handles cases where parent operation has already synced entity
targetVersion = Max(currentParentVersion, index->AlterVersion);
}

if (currentImplVersion <= targetVersion) {
table->AlterVersion = targetVersion;
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Synchronized index impl table version to parent table"
"Synchronized index impl table version"
<< ", implTablePathId: " << versionCtx.PathId
<< ", parentTablePathId: " << versionCtx.GrandParentPathId
<< ", oldImplVersion: " << currentImplVersion
<< ", parentVersion: " << currentParentVersion
<< ", targetVersion: " << targetVersion
<< ", newImplVersion: " << table->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
} else {
Expand All @@ -139,6 +149,7 @@ void SyncImplTableVersion(
<< ", implTablePathId: " << versionCtx.PathId
<< ", implVersion: " << currentImplVersion
<< ", parentVersion: " << currentParentVersion
<< ", targetVersion: " << targetVersion
<< ", newImplVersion: " << table->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
}
Expand All @@ -156,19 +167,32 @@ void SyncIndexEntityVersion(
}

auto index = context.SS->Indexes.at(indexPathId);
index->AlterVersion = targetVersion;
ui64 oldIndexVersion = index->AlterVersion;

context.SS->PersistTableIndexAlterVersion(db, indexPathId, index);
// Only update if we're increasing the version (prevent downgrade due to race conditions)
if (targetVersion > oldIndexVersion) {
index->AlterVersion = targetVersion;

auto indexPath = context.SS->PathsById.at(indexPathId);
context.SS->ClearDescribePathCaches(indexPath);
context.OnComplete.PublishToSchemeBoard(operationId, indexPathId);
context.SS->PersistTableIndexAlterVersion(db, indexPathId, index);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Synced index entity version"
<< ", indexPathId: " << indexPathId
<< ", newVersion: " << index->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
auto indexPath = context.SS->PathsById.at(indexPathId);
context.SS->ClearDescribePathCaches(indexPath);
context.OnComplete.PublishToSchemeBoard(operationId, indexPathId);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Synced index entity version"
<< ", indexPathId: " << indexPathId
<< ", oldVersion: " << oldIndexVersion
<< ", newVersion: " << index->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
} else {
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Skipping index entity sync - already at higher version"
<< ", indexPathId: " << indexPathId
<< ", currentVersion: " << oldIndexVersion
<< ", targetVersion: " << targetVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
}
}

void SyncChildIndexes(
Expand Down
Loading