Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
internal var id: String

/// The `ObjectMessage`s gathered during this sync sequence.
internal var syncObjectsPool: [SyncObjectsPoolEntry]
internal var syncObjectsPool: SyncObjectsPool
}

internal init(
Expand Down Expand Up @@ -564,16 +564,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
}
}

let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in
if let object = objectMessage.object {
SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
} else {
nil
}
}

// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
let completedSyncObjectsPool: SyncObjectsPool?
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
let syncSequenceForSyncingState: SyncSequence?

Expand All @@ -583,15 +575,21 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
} else {
nil
}
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [])
// RTO5b
updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries)
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: .init())
// RTO5f
for objectMessage in objectMessages {
updatedSyncSequence.syncObjectsPool.accumulate(objectMessage: objectMessage, logger: logger)
}
syncSequenceForSyncingState = updatedSyncSequence

completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
} else {
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
completedSyncObjectsPool = syncObjectsPoolEntries
var pool = SyncObjectsPool()
for objectMessage in objectMessages {
pool.accumulate(objectMessage: objectMessage, logger: logger)
}
completedSyncObjectsPool = pool
syncSequenceForSyncingState = nil
}

Expand Down
15 changes: 6 additions & 9 deletions Sources/AblyLiveObjects/Internal/ObjectsPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ internal struct ObjectsPool {

/// Applies the objects gathered during an `OBJECT_SYNC` to this `ObjectsPool`, per RTO5c1 and RTO5c2.
internal mutating func nosync_applySyncObjectsPool(
_ syncObjectsPool: [SyncObjectsPoolEntry],
_ syncObjectsPool: SyncObjectsPool,
logger: Logger,
internalQueue: DispatchQueue,
userCallbackQueue: DispatchQueue,
Expand Down Expand Up @@ -309,7 +309,7 @@ internal struct ObjectsPool {
logger.log("Creating new object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)

// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
let newEntry: Entry?
let newEntry: Entry

if syncObjectsPoolEntry.state.counter != nil {
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
Expand Down Expand Up @@ -345,15 +345,12 @@ internal struct ObjectsPool {
)
newEntry = .map(map)
} else {
// RTO5c1b1c: Otherwise, log a warning that an unsupported object state message has been received, and discard the current ObjectState without taking any action
logger.log("Unsupported object state message received for objectId: \(syncObjectsPoolEntry.state.objectId)", level: .warn)
newEntry = nil
// See SyncObjectsPool.Entry.state documentation.
preconditionFailure("SyncObjectsPool entry for objectId \(syncObjectsPoolEntry.state.objectId) has neither counter nor map")
}

if let newEntry {
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
entries[syncObjectsPoolEntry.state.objectId] = newEntry
}
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
entries[syncObjectsPoolEntry.state.objectId] = newEntry
}
}

Expand Down
101 changes: 101 additions & 0 deletions Sources/AblyLiveObjects/Internal/SyncObjectsPool.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import Foundation

/// The RTO5b collection of objects gathered during an `OBJECT_SYNC` sequence, ready to be applied to the `ObjectsPool`.
///
/// Internally stores `InboundObjectMessage` values keyed by `objectId`. The `accumulate` method implements the RTO5f
/// merge logic for partial object sync.
internal struct SyncObjectsPool: Sequence {
/// A computed view of a stored `InboundObjectMessage`, yielded during iteration.
///
/// Preserves backward compatibility with the consumption side in `ObjectsPool.nosync_applySyncObjectsPool`.
internal struct Entry {
/// Guaranteed to have either `.map` or `.counter` populated.
internal var state: ObjectState
/// The `serialTimestamp` of the `ObjectMessage` that generated this entry.
internal var objectMessageSerialTimestamp: Date?
}

private var objectMessages: [String: InboundObjectMessage]

/// Creates an empty pool.
internal init() {
objectMessages = [:]
}

/// Accumulates an `ObjectMessage` into the pool per RTO5f.
internal mutating func accumulate(
objectMessage: InboundObjectMessage,
logger: Logger,
) {
guard let object = objectMessage.object else {
return
}

let objectId = object.objectId

// RTO5f3: Reject unsupported object types before pool lookup. This provides the guarantee documented on Entry.state.
guard object.map != nil || object.counter != nil else {
logger.log("Skipping unsupported object type during sync for objectId \(objectId)", level: .warn)
return
}

if let existing = objectMessages[objectId] {
// RTO5f2: An entry already exists for this objectId (partial object state).
if object.map != nil {
// RTO5f2a: Incoming message has a map.
if object.tombstone {
// RTO5f2a1: Incoming tombstone is true — replace the entire entry.
objectMessages[objectId] = objectMessage
} else {
// RTO5f2a2: Merge map entries into the existing message.
var merged = existing
if let incomingEntries = object.map?.entries {
var mergedObject = merged.object!
var mergedMap = mergedObject.map!
var mergedEntries = mergedMap.entries ?? [:]
mergedEntries.merge(incomingEntries) { _, new in new }
mergedMap.entries = mergedEntries
mergedObject.map = mergedMap
merged.object = mergedObject
}
objectMessages[objectId] = merged
}
} else {
// RTO5f2b: Incoming message has a counter — log error, skip.
logger.log("Received partial counter sync for objectId \(objectId); skipping", level: .error)
}
} else {
// RTO5f1: No entry exists for this objectId — store the message.
objectMessages[objectId] = objectMessage
}
}

internal var count: Int { objectMessages.count }
internal var isEmpty: Bool { objectMessages.isEmpty }

// MARK: - Sequence conformance

internal struct Iterator: IteratorProtocol {
private var underlying: Dictionary<String, InboundObjectMessage>.Values.Iterator

fileprivate init(_ underlying: Dictionary<String, InboundObjectMessage>.Values.Iterator) {
self.underlying = underlying
}

internal mutating func next() -> Entry? {
guard let message = underlying.next() else {
return nil
}

// We only store messages whose `object` is non-nil (see `accumulate`).
return Entry(
state: message.object!,
objectMessageSerialTimestamp: message.serialTimestamp,
)
}
}

internal func makeIterator() -> Iterator {
Iterator(objectMessages.values.makeIterator())
}
}
15 changes: 0 additions & 15 deletions Sources/AblyLiveObjects/Internal/SyncObjectsPoolEntry.swift

This file was deleted.

2 changes: 2 additions & 0 deletions Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ struct TestFactories {
object: ObjectState? = nil,
serial: String? = nil,
siteCode: String? = nil,
serialTimestamp: Date? = nil,
) -> InboundObjectMessage {
InboundObjectMessage(
id: id,
Expand All @@ -279,6 +280,7 @@ struct TestFactories {
object: object,
serial: serial,
siteCode: siteCode,
serialTimestamp: serialTimestamp,
)
}

Expand Down
Loading
Loading