Skip to content

Reference Migration Fix #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 131 additions & 137 deletions Sources/CodableDatastore/Datastore/Datastore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -190,161 +190,155 @@ extension Datastore {
handler(.evaluating)
}

var newDescriptor: DatastoreDescriptor?

let primaryIndex = _load(IndexRange(), order: .ascending, awaitWarmup: false)
/// Grab an up-to-date descriptor and check the indexes against it
let updatedDescriptor = try generateUpdatedDescriptor()

var rebuildPrimaryIndex = false
var directIndexesToBuild: Set<IndexName> = []
var secondaryIndexesToBuild: Set<IndexName> = []
var index = 0

let versionData = try Data(self.version)
/// Check the primary index for compatibility.
if persistedDescriptor.identifierType != updatedDescriptor.identifierType {
try await transaction.resetPrimaryIndex(datastoreKey: key)
rebuildPrimaryIndex = true
}

for try await (idenfifier, instance) in primaryIndex {
defer { index += 1 }
/// Use the first index to grab an up-to-date descriptor
if newDescriptor == nil {
let updatedDescriptor = try generateUpdatedDescriptor()
newDescriptor = updatedDescriptor

/// Check the primary index for compatibility.
if persistedDescriptor.identifierType != updatedDescriptor.identifierType {
try await transaction.resetPrimaryIndex(datastoreKey: key)
rebuildPrimaryIndex = true
/// Check existing direct indexes for compatibility
for (_, persistedIndex) in persistedDescriptor.directIndexes {
if let updatedIndex = updatedDescriptor.directIndexes[persistedIndex.name] {
/// If the index still exists, make sure it is compatible by checking their types, or checking if the primary index must be re-built.
if persistedIndex.type != updatedIndex.type || rebuildPrimaryIndex {
/// They were not compatible, so delete the bad index, and queue it to be re-built.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
directIndexesToBuild.insert(persistedIndex.name)
}

/// Check existing direct indexes for compatibility
for (_, persistedIndex) in persistedDescriptor.directIndexes {
if let updatedIndex = updatedDescriptor.directIndexes[persistedIndex.name] {
/// If the index still exists, make sure it is compatible by checking their types, or checking if the primary index must be re-built.
if persistedIndex.type != updatedIndex.type || rebuildPrimaryIndex {
/// They were not compatible, so delete the bad index, and queue it to be re-built.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
directIndexesToBuild.insert(persistedIndex.name)
}
} else {
/// The index is no longer needed, delete it.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
}
}

/// Check for new direct indexes to build
for (_, updatedIndex) in updatedDescriptor.directIndexes {
guard persistedDescriptor.directIndexes[updatedIndex.name] == nil else { continue }
/// The index does not yet exist, so queue it to be built.
directIndexesToBuild.insert(updatedIndex.name)
}

/// Check existing secondary indexes for compatibility
for (_, persistedIndex) in persistedDescriptor.referenceIndexes {
if let updatedIndex = updatedDescriptor.referenceIndexes[persistedIndex.name] {
/// If the index still exists, make sure it is compatible
if persistedIndex.type != updatedIndex.type {
/// They were not compatible, so delete the bad index, and queue it to be re-built.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
secondaryIndexesToBuild.insert(persistedIndex.name)
}
} else {
/// The index is no longer needed, delete it.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
}
} else {
/// The index is no longer needed, delete it.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
}
}

/// Check for new direct indexes to build
for (_, updatedIndex) in updatedDescriptor.directIndexes {
guard persistedDescriptor.directIndexes[updatedIndex.name] == nil else { continue }
/// The index does not yet exist, so queue it to be built.
directIndexesToBuild.insert(updatedIndex.name)
}

/// Check existing reference indexes for compatibility
for (_, persistedIndex) in persistedDescriptor.referenceIndexes {
if let updatedIndex = updatedDescriptor.referenceIndexes[persistedIndex.name] {
/// If the index still exists, make sure it is compatible
if persistedIndex.type != updatedIndex.type {
/// They were not compatible, so delete the bad index, and queue it to be re-built.
try await transaction.deleteSecondaryIndex(indexName: persistedIndex.name, datastoreKey: key)
secondaryIndexesToBuild.insert(persistedIndex.name)
}
} else {
/// The index is no longer needed, delete it.
try await transaction.deleteSecondaryIndex(indexName: persistedIndex.name, datastoreKey: key)
}
}

/// Check for new reference indexes to build
for (_, updatedIndex) in updatedDescriptor.referenceIndexes {
guard persistedDescriptor.referenceIndexes[updatedIndex.name] == nil else { continue }
/// The index does not yet exist, so queue it to be built.
secondaryIndexesToBuild.insert(updatedIndex.name)
}

/// Remove any direct indexes from the secondary ones we may have requested.
secondaryIndexesToBuild.subtract(directIndexesToBuild)

/// Only perform work if we need to rebuild anything.
if rebuildPrimaryIndex || !directIndexesToBuild.isEmpty || !secondaryIndexesToBuild.isEmpty {
/// Create any missing indexes and prime the datastore for writing.
try await transaction.apply(descriptor: updatedDescriptor, for: key)

let primaryIndex = _load(IndexRange(), order: .ascending, awaitWarmup: false)

let versionData = try Data(self.version)

for try await (idenfifier, instance) in primaryIndex {
defer { index += 1 }

/// Check for new secondary indexes to build
for (_, updatedIndex) in updatedDescriptor.referenceIndexes {
guard persistedDescriptor.referenceIndexes[updatedIndex.name] == nil else { continue }
/// The index does not yet exist, so queue it to be built.
secondaryIndexesToBuild.insert(updatedIndex.name)
/// Notify progress handlers we are starting an entry.
for handler in warmupProgressHandlers {
handler(.working(current: index, total: persistedDescriptor.size))
}

/// Remove any direct indexes from the secondary ones we may have requested.
secondaryIndexesToBuild.subtract(directIndexesToBuild)
let instanceData = try await encoder(instance)

/// If we don't need to migrate anything, stop here.
if rebuildPrimaryIndex == false, directIndexesToBuild.isEmpty, secondaryIndexesToBuild.isEmpty {
break
if rebuildPrimaryIndex {
let insertionCursor = try await transaction.primaryIndexCursor(inserting: idenfifier, datastoreKey: key)

try await transaction.persistPrimaryIndexEntry(
versionData: versionData,
identifierValue: idenfifier,
instanceData: instanceData,
cursor: insertionCursor,
datastoreKey: key
)
}

/// Create any missing indexes and prime the datastore for writing.
try await transaction.apply(descriptor: updatedDescriptor, for: key)
}

/// Notify progress handlers we are starting an entry.
for handler in warmupProgressHandlers {
handler(.working(current: index, total: persistedDescriptor.size))
}

let instanceData = try await encoder(instance)

if rebuildPrimaryIndex {
let insertionCursor = try await transaction.primaryIndexCursor(inserting: idenfifier, datastoreKey: key)
var queriedIndexes: Set<IndexName> = []

try await transaction.persistPrimaryIndexEntry(
versionData: versionData,
identifierValue: idenfifier,
instanceData: instanceData,
cursor: insertionCursor,
datastoreKey: key
)
}

var queriedIndexes: Set<IndexName> = []

for (_, generatedRepresentation) in indexRepresentations {
let indexName = generatedRepresentation.indexName
switch generatedRepresentation.storage {
case .direct:
guard
directIndexesToBuild.contains(indexName),
!queriedIndexes.contains(indexName)
else { return }
queriedIndexes.insert(indexName)

for updatedValue in instance[index: generatedRepresentation.index] {
/// Grab a cursor to insert the new value in the index.
let updatedValueCursor = try await transaction.directIndexCursor(
inserting: updatedValue.indexed,
identifier: idenfifier,
indexName: indexName,
datastoreKey: key
)
for (_, generatedRepresentation) in indexRepresentations {
let indexName = generatedRepresentation.indexName
switch generatedRepresentation.storage {
case .direct:
guard
directIndexesToBuild.contains(indexName),
!queriedIndexes.contains(indexName)
else { return }
queriedIndexes.insert(indexName)

/// Insert it.
try await transaction.persistDirectIndexEntry(
versionData: versionData,
indexValue: updatedValue.indexed,
identifierValue: idenfifier,
instanceData: instanceData,
cursor: updatedValueCursor,
indexName: indexName,
datastoreKey: key
)
}
case .reference:
guard
secondaryIndexesToBuild.contains(indexName),
!queriedIndexes.contains(indexName)
else { return }
queriedIndexes.insert(indexName)

for updatedValue in instance[index: generatedRepresentation.index] {
/// Grab a cursor to insert the new value in the index.
let updatedValueCursor = try await transaction.secondaryIndexCursor(
inserting: updatedValue.indexed,
identifier: idenfifier,
indexName: indexName,
datastoreKey: self.key
)
for updatedValue in instance[index: generatedRepresentation.index] {
/// Grab a cursor to insert the new value in the index.
let updatedValueCursor = try await transaction.directIndexCursor(
inserting: updatedValue.indexed,
identifier: idenfifier,
indexName: indexName,
datastoreKey: key
)

/// Insert it.
try await transaction.persistDirectIndexEntry(
versionData: versionData,
indexValue: updatedValue.indexed,
identifierValue: idenfifier,
instanceData: instanceData,
cursor: updatedValueCursor,
indexName: indexName,
datastoreKey: key
)
}
case .reference:
guard
secondaryIndexesToBuild.contains(indexName),
!queriedIndexes.contains(indexName)
else { return }
queriedIndexes.insert(indexName)

/// Insert it.
try await transaction.persistSecondaryIndexEntry(
indexValue: updatedValue.indexed,
identifierValue: idenfifier,
cursor: updatedValueCursor,
indexName: indexName,
datastoreKey: self.key
)
for updatedValue in instance[index: generatedRepresentation.index] {
/// Grab a cursor to insert the new value in the index.
let updatedValueCursor = try await transaction.secondaryIndexCursor(
inserting: updatedValue.indexed,
identifier: idenfifier,
indexName: indexName,
datastoreKey: self.key
)

/// Insert it.
try await transaction.persistSecondaryIndexEntry(
indexValue: updatedValue.indexed,
identifierValue: idenfifier,
cursor: updatedValueCursor,
indexName: indexName,
datastoreKey: self.key
)
}
}
}
}
Expand Down
Loading